From dd2a83b0df57adf1be0b6626bf03ac057e8acb38 Mon Sep 17 00:00:00 2001 From: Dennis Felsing Date: Thu, 4 Dec 2025 19:17:21 +0000 Subject: [PATCH 1/2] parallel-workload: More complex UNION/Record/List --- .../materialize/data_ingest/data_type.py | 113 +++++++++++++++++- .../materialize/parallel_workload/action.py | 58 ++++++--- .../materialize/parallel_workload/database.py | 7 +- .../parallel_workload/expression.py | 20 +++- 4 files changed, 176 insertions(+), 22 deletions(-) diff --git a/misc/python/materialize/data_ingest/data_type.py b/misc/python/materialize/data_ingest/data_type.py index 2b420d5d7a69d..629fd93884010 100644 --- a/misc/python/materialize/data_ingest/data_type.py +++ b/misc/python/materialize/data_ingest/data_type.py @@ -514,7 +514,7 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: return "string" -class Bytea(Text): +class Bytea(DataType): @staticmethod def name(backend: Backend = Backend.MATERIALIZE) -> str: if backend == Backend.AVRO: @@ -524,6 +524,44 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: else: return "bytea" + @staticmethod + def random_value( + rng: random.Random, + record_size: RecordSize = RecordSize.LARGE, + in_query: bool = False, + ) -> Any: + if rng.randrange(10) == 0: + result = rng.choice( + [ + "NULL", + "0.0", + "True", + # "", + "表ポあA鷗ŒéB逍Üߪąñ丂㐀𠀀", + rng.randint(-100, 100), + ] + ) + # Fails: unterminated dollar-quoted string + # chars = string.printable + chars = string.ascii_letters + string.digits + if record_size == RecordSize.TINY: + result = rng.choice(("foo", "bar", "baz")) + elif record_size == RecordSize.SMALL: + result = "".join(rng.choice(chars) for _ in range(3)) + elif record_size == RecordSize.MEDIUM: + result = "".join(rng.choice(chars) for _ in range(10)) + elif record_size == RecordSize.LARGE: + result = "".join(rng.choice(chars) for _ in range(100)) + else: + raise ValueError(f"Unexpected record size {record_size}") + + return f"{literal(str(result))}::bytea" if in_query else str(result) + + @staticmethod + def numeric_value(num: int, in_query: bool = False) -> Any: + result = f"key{num}" + return f"'{result}'::bytea" if in_query else str(result) + class UUID(DataType): @staticmethod @@ -705,6 +743,66 @@ def numeric_value(num: int, in_query: bool = False) -> Any: return f"'{values_str}'::int list" if in_query else values_str +class RecordList(DataType): + @staticmethod + def name(backend: Backend = Backend.MATERIALIZE) -> str: + if backend == Backend.AVRO: + raise ValueError("Unsupported") + elif backend == Backend.JSON: + raise ValueError("Unsupported") + else: + return "record list" + + @staticmethod + def random_value( + rng: random.Random, + record_size: RecordSize = RecordSize.LARGE, + in_query: bool = False, + ) -> Any: + if record_size == RecordSize.TINY: + key_range = 1 + elif record_size == RecordSize.SMALL: + key_range = 5 + elif record_size == RecordSize.MEDIUM: + key_range = 10 + elif record_size == RecordSize.LARGE: + key_range = 20 + else: + raise ValueError(f"Unexpected record size {record_size}") + values = [f"row({rng.randint(-100, 100)})" for i in range(0, key_range)] + return f"list[{', '.join(values)}]" + + @staticmethod + def numeric_value(num: int, in_query: bool = False) -> Any: + values = [f"row({i})" for i in range(0, num)] + return f"list[{', '.join(values)}]" + + +class Record(DataType): + @staticmethod + def name(backend: Backend = Backend.MATERIALIZE) -> str: + if backend == Backend.AVRO: + raise ValueError("Unsupported") + elif backend == Backend.JSON: + raise ValueError("Unsupported") + else: + return "record" + + @staticmethod + def random_value( + rng: random.Random, + record_size: RecordSize = RecordSize.LARGE, + in_query: bool = False, + ) -> Any: + value = str(rng.choice(["null::integer", "1"])) + return f"row({value})" + + @staticmethod + def numeric_value(num: int, in_query: bool = False) -> Any: + value = str(num) + return f"row({value})" + + class Timestamp(DataType): @staticmethod def random_value( @@ -889,6 +987,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: # Sort to keep determinism for reproducible runs with specific seed DATA_TYPES = sorted(list(all_subclasses(DataType)), key=repr) +DATA_TYPES_FOR_COLUMNS = sorted(list(set(DATA_TYPES) - {Record, RecordList}), key=repr) + # fastavro._schema_common.UnknownType: record # bytea requires Python bytes type instead of str DATA_TYPES_FOR_AVRO = sorted( @@ -902,6 +1002,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: UUID, Interval, IntList, + RecordList, + Record, IntArray, Time, Date, @@ -925,6 +1027,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: set(DATA_TYPES) - { IntList, + RecordList, + Record, IntArray, UUID, TextTextMap, @@ -948,6 +1052,8 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: set(DATA_TYPES) - { IntList, + RecordList, + Record, IntArray, UUID, TextTextMap, @@ -974,7 +1080,10 @@ def name(backend: Backend = Backend.MATERIALIZE) -> str: # MySQL doesn't support keys of unlimited size DATA_TYPES_FOR_KEY = sorted( - list(set(DATA_TYPES_FOR_AVRO) - {Text, Bytea, IntList, IntArray, Float, Double}), + list( + set(DATA_TYPES_FOR_AVRO) + - {Text, Bytea, IntList, RecordList, Record, IntArray, Float, Double} + ), key=repr, ) diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index e008fe72f6e76..0caab7df6dfa9 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -26,6 +26,7 @@ import materialize.parallel_workload.database from materialize.data_ingest.data_type import ( + DATA_TYPES, NUMBER_TYPES, Boolean, Text, @@ -42,7 +43,6 @@ ) from materialize.mzcompose.services.minio import minio_blob_uri from materialize.parallel_workload.database import ( - DATA_TYPES, DB, MAX_CLUSTERS, MAX_COLUMNS, @@ -239,24 +239,27 @@ def generate_select_query(self, exe: Executor, expr_kind: ExprKind) -> str: join = obj_name != obj2_name and obj not in exe.db.views and columns - if join: - all_columns = list(obj.columns) + list(obj2.columns) - else: - all_columns = obj.columns + all_columns = list(obj.columns) + list(obj2.columns) if join else obj.columns + column_types = [] if self.rng.random() < 0.9: + column_types = [ + self.rng.choice(list(DATA_TYPES)) + for i in range(self.rng.randint(1, 10)) + ] expressions = ", ".join( [ expression( - self.rng.choice(list(DATA_TYPES)), + column_type, all_columns, self.rng, expr_kind, ) - for i in range(self.rng.randint(1, 10)) + for column_type in column_types ] ) if self.rng.choice([True, False]): + column_types = [] column1 = self.rng.choice(all_columns) column2 = self.rng.choice(all_columns) column3 = self.rng.choice(all_columns) @@ -299,17 +302,42 @@ def generate_select_query(self, exe: Executor, expr_kind: ExprKind) -> str: if self.rng.choice([True, False]): query += f" WHERE {expression(Boolean, all_columns, self.rng, expr_kind)}" - if self.rng.choice([True, False]): - query += f" UNION ALL SELECT {expressions} FROM {obj_name}" + if bool(column_types) and self.rng.choice([True, False]): + obj3 = self.rng.choice(exe.db.db_objects()) + obj3_name = str(obj3) + column3 = self.rng.choice(obj3.columns) + obj4 = self.rng.choice(exe.db.db_objects()) + obj4_name = str(obj4) + columns_union = [ + c + for c in obj4.columns + if c.data_type == column3.data_type and c.data_type != TextTextMap + ] + join_union = ( + obj3_name != obj4_name and obj3 not in exe.db.views and columns_union + ) + all_columns_union = ( + list(obj3.columns) + list(obj4.columns) if join_union else obj3.columns + ) + expressions3 = ", ".join( + [ + expression( + column_type, + all_columns_union, + self.rng, + expr_kind, + ) + for column_type in column_types + ] + ) + query += f" UNION ALL SELECT {expressions3} FROM {obj3_name}" - if join: - column2 = self.rng.choice(columns) - query += f" JOIN {obj2_name} ON {column} = {column2}" + if join_union: + column4 = self.rng.choice(columns_union) + query += f" JOIN {obj4_name} ON {column3} = {column4}" if self.rng.choice([True, False]): - query += ( - f" WHERE {expression(Boolean, all_columns, self.rng, expr_kind)}" - ) + query += f" WHERE {expression(Boolean, all_columns_union, self.rng, expr_kind)}" query += f" LIMIT {self.rng.randint(0, 100)}" return query diff --git a/misc/python/materialize/parallel_workload/database.py b/misc/python/materialize/parallel_workload/database.py index fe5154f1fc6d7..5442f7ad246dd 100644 --- a/misc/python/materialize/parallel_workload/database.py +++ b/misc/python/materialize/parallel_workload/database.py @@ -15,8 +15,8 @@ from pg8000.native import identifier, literal from materialize.data_ingest.data_type import ( - DATA_TYPES, DATA_TYPES_FOR_AVRO, + DATA_TYPES_FOR_COLUMNS, DATA_TYPES_FOR_KEY, DATA_TYPES_FOR_MYSQL, DATA_TYPES_FOR_SQL_SERVER, @@ -174,7 +174,7 @@ def __init__(self, rng: random.Random, table_id: int, schema: Schema): self.table_id = table_id self.schema = schema self.columns = [ - Column(rng, i, rng.choice(DATA_TYPES), self) + Column(rng, i, rng.choice(DATA_TYPES_FOR_COLUMNS), self) for i in range(rng.randint(2, MAX_COLUMNS)) ] self.num_rows = 0 @@ -225,7 +225,8 @@ def __init__( list(base_object2.columns) if base_object2 else [] ) self.data_types = [ - rng.choice(list(DATA_TYPES)) for i in range(rng.randint(1, MAX_COLUMNS)) + rng.choice(list(DATA_TYPES_FOR_COLUMNS)) + for i in range(rng.randint(1, MAX_COLUMNS)) ] self.expressions = [ expression(data_type, all_columns, rng, kind=ExprKind.MATERIALIZABLE) diff --git a/misc/python/materialize/parallel_workload/expression.py b/misc/python/materialize/parallel_workload/expression.py index f0dffcd8f328e..04eadf8c60c7c 100644 --- a/misc/python/materialize/parallel_workload/expression.py +++ b/misc/python/materialize/parallel_workload/expression.py @@ -27,6 +27,8 @@ MzTimestamp, Numeric, Numeric383, + Record, + RecordList, RecordSize, Text, TextTextMap, @@ -74,13 +76,22 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL if dt != Bytea: FUNC_OPS[Text] += [FuncOp("cast({} as text)", [dt])] - if dt not in (IntList, IntArray, TextTextMap, Bytea, Jsonb, Text): + if dt not in ( + IntList, + RecordList, + Record, + IntArray, + TextTextMap, + Bytea, + Jsonb, + Text, + ): FUNC_OPS[Text] += [ FuncOp("{} || {}", [dt, Text]), FuncOp("{} || {}", [Text, dt]), ] - if dt not in (IntList, IntArray, TextTextMap, Bytea, Jsonb): + if dt not in (IntList, RecordList, Record, IntArray, TextTextMap, Bytea, Jsonb): FUNC_OPS[Boolean] += [ FuncOp("{} > {}", [dt, dt]), FuncOp("{} < {}", [dt, dt]), @@ -182,6 +193,10 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL FuncOp("mz_is_superuser()", [], unsupported=ExprKind.MATERIALIZABLE), ] +FUNC_OPS[Record] += [FuncOp("row{}", [Int])] + +FUNC_OPS[RecordList] += [FuncOp("list[{}]", [Record])] + FUNC_OPS[Text] += [ FuncOp("lower{}", [Text]), FuncOp("upper{}", [Text]), @@ -213,6 +228,7 @@ def __init__(self, text: str, params: list, unsupported: ExprKind = ExprKind.ALL FuncOp("char_length{}", [Text]), FuncOp("map_length{}", [TextTextMap]), FuncOp("list_length{}", [IntList]), + FuncOp("list_length{}", [RecordList]), FuncOp("mz_version_num()", [], unsupported=ExprKind.MATERIALIZABLE), FuncOp("pg_backend_pid()", [], unsupported=ExprKind.MATERIALIZABLE), FuncOp("{} - {}", [Date, Date]), From 5c6d2db949e10ebcbe5d60be178ac8ad269a2525 Mon Sep 17 00:00:00 2001 From: Gabor Gevay Date: Wed, 3 Dec 2025 14:07:13 +0100 Subject: [PATCH 2/2] Hacky fix of SqlColumnType::union --- src/repr/src/relation.rs | 42 +++++++++++++++++++++++++++++++++++++++- src/repr/src/scalar.rs | 17 ++++++++++------ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/src/repr/src/relation.rs b/src/repr/src/relation.rs index ca5d4cbd118b5..a980c6f65a4db 100644 --- a/src/repr/src/relation.rs +++ b/src/repr/src/relation.rs @@ -30,6 +30,7 @@ pub use crate::relation_and_scalar::{ ProtoRelationVersion, }; use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column}; +use crate::SqlScalarType::{Array, List, Map, Range}; /// The type of a [`Datum`]. /// @@ -68,12 +69,51 @@ impl SqlColumnType { nullable: self.nullable || other.nullable, }) } - (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => { + (scalar_type, other_scalar_type) if scalar_type.base_eq_with_nullability(other_scalar_type) => { Ok(SqlColumnType { scalar_type: scalar_type.without_modifiers(), nullable: self.nullable || other.nullable, }) } + + ( + List { + element_type: l, + custom_id: oid_l, + }, + List { + element_type: r, + custom_id: _oid_r, + }, + ) if SqlColumnType {scalar_type: (**l).clone(), nullable: true}.union(&SqlColumnType {scalar_type: (**r).clone(), nullable: true}).is_ok() => { + Ok(SqlColumnType { + scalar_type: List { + element_type: Box::new(SqlColumnType {scalar_type: (**l).clone(), nullable: true}.union(&SqlColumnType {scalar_type: (**r).clone(), nullable: true}).unwrap().scalar_type), + custom_id: None, //////// todo + }, + nullable: self.nullable || other.nullable, + }) + } + ( + Map { + value_type: l, + custom_id: oid_l, + }, + Map { + value_type: r, + custom_id: _oid_r, + }, + ) if SqlColumnType {scalar_type: (**l).clone(), nullable: true}.union(&SqlColumnType {scalar_type: (**r).clone(), nullable: true}).is_ok() => { + Ok(SqlColumnType { + scalar_type: Map { + value_type: Box::new(SqlColumnType {scalar_type: (**l).clone(), nullable: true}.union(&SqlColumnType {scalar_type: (**r).clone(), nullable: true}).unwrap().scalar_type), + custom_id: None, //////// todo + }, + nullable: self.nullable || other.nullable, + }) + } + ///////// todo: Array; Range + ( SqlScalarType::Record { fields, custom_id }, SqlScalarType::Record { diff --git a/src/repr/src/scalar.rs b/src/repr/src/scalar.rs index 7f33492bf299a..1e2507925fcd4 100644 --- a/src/repr/src/scalar.rs +++ b/src/repr/src/scalar.rs @@ -3156,16 +3156,20 @@ impl SqlScalarType { /// contrast, two `Numeric` values with different scales are never `Eq` to /// one another. pub fn base_eq(&self, other: &SqlScalarType) -> bool { - self.eq_inner(other, false) + self.eq_inner(other, false, true) + } + + pub fn base_eq_with_nullability(&self, other: &SqlScalarType) -> bool { + self.eq_inner(other, false, false) } // Determines equality among scalar types that ignores any custom OIDs or // embedded values. pub fn structural_eq(&self, other: &SqlScalarType) -> bool { - self.eq_inner(other, true) + self.eq_inner(other, true, true) } - pub fn eq_inner(&self, other: &SqlScalarType, structure_only: bool) -> bool { + pub fn eq_inner(&self, other: &SqlScalarType, structure_only: bool, ignore_nullability: bool) -> bool { use SqlScalarType::*; match (self, other) { ( @@ -3187,9 +3191,9 @@ impl SqlScalarType { value_type: r, custom_id: oid_r, }, - ) => l.eq_inner(r, structure_only) && (oid_l == oid_r || structure_only), + ) => l.eq_inner(r, structure_only, ignore_nullability) && (oid_l == oid_r || structure_only), (Array(a), Array(b)) | (Range { element_type: a }, Range { element_type: b }) => { - a.eq_inner(b, structure_only) + a.eq_inner(b, structure_only, ignore_nullability) } ( Record { @@ -3209,7 +3213,8 @@ impl SqlScalarType { // Ignore nullability. .all(|(a, b)| { (a.0 == b.0 || structure_only) - && a.1.scalar_type.eq_inner(&b.1.scalar_type, structure_only) + && a.1.scalar_type.eq_inner(&b.1.scalar_type, structure_only, ignore_nullability) + && if !ignore_nullability {a.1.nullable == b.1.nullable} else {true} }) } (s, o) => SqlScalarBaseType::from(s) == SqlScalarBaseType::from(o),