Skip to content

Commit fee745c

Browse files
committed
update partition handling in integration tests
1 parent 9cf11b5 commit fee745c

File tree

3 files changed

+60
-72
lines changed

3 files changed

+60
-72
lines changed

sqlmesh/core/engine_adapter/doris.py

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class DorisEngineAdapter(
4141
LogicalMergeMixin, PandasNativeFetchDFSupportMixin, NonTransactionalTruncateMixin
4242
):
4343
DIALECT = "doris"
44-
DEFAULT_BATCH_SIZE = 200
44+
DEFAULT_BATCH_SIZE = 5000
4545
SUPPORTS_TRANSACTIONS = False
4646
COMMENT_CREATION_TABLE = CommentCreationTable.IN_SCHEMA_DEF_NO_CTAS
4747
COMMENT_CREATION_VIEW = CommentCreationView.IN_SCHEMA_DEF_NO_COMMANDS
@@ -54,8 +54,6 @@ class DorisEngineAdapter(
5454
SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
5555
SUPPORTS_CREATE_DROP_CATALOG = False
5656
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.DELETE_INSERT
57-
# default setting `enable_unicode_name_support=false` so it is incompatible with unicode characters in model names
58-
QUOTE_IDENTIFIERS_IN_VIEWS = False
5957

6058
def create_schema(
6159
self,
@@ -937,33 +935,6 @@ def _parse_trigger_string(
937935
if add_partition:
938936
partitions = table_properties_copy.pop("partitions", None)
939937

940-
# If partitioned_by is provided but partitions is not, add dynamic partition properties
941-
# Skip dynamic partitions for materialized views as they use different partitioning
942-
if partitioned_by and not partitions and not is_materialized_view:
943-
# Define the required dynamic partition properties
944-
dynamic_partition_props = {
945-
"dynamic_partition.enable": "true",
946-
"dynamic_partition.time_unit": "DAY",
947-
"dynamic_partition.start": "-490",
948-
"dynamic_partition.end": "10",
949-
"dynamic_partition.prefix": "p",
950-
"dynamic_partition.buckets": "32",
951-
"dynamic_partition.create_history_partition": "true",
952-
}
953-
954-
# Use partition_interval_unit if provided to set the time_unit
955-
if partition_interval_unit:
956-
if hasattr(partition_interval_unit, "value"):
957-
time_unit = partition_interval_unit.value.upper()
958-
else:
959-
time_unit = str(partition_interval_unit).upper()
960-
dynamic_partition_props["dynamic_partition.time_unit"] = time_unit
961-
962-
# Add missing dynamic partition properties to table_properties_copy
963-
for key, value in dynamic_partition_props.items():
964-
if key not in table_properties_copy:
965-
table_properties_copy[key] = value
966-
967938
# Build partition expression - different for materialized views vs tables
968939
if is_materialized_view:
969940
# For materialized views, use PartitionedByProperty

tests/core/engine_adapter/integration/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ def timestamp_columns(self) -> t.List[str]:
266266
for k, v in self.columns_to_types.items()
267267
if v.sql().lower().startswith("timestamp")
268268
or (v.sql().lower() == "datetime" and self.dialect == "bigquery")
269+
or (v.sql().lower() == "datetime" and self.dialect == "doris")
269270
]
270271

271272
@property

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 58 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import pytest
2222
import pytz
2323
import time_machine
24-
from tenacity import Retrying, stop_after_delay, wait_fixed, retry_if_exception_type
2524
from sqlglot import exp, parse_one
2625
from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
2726
from sqlglot.optimizer.qualify_columns import quote_identifiers
@@ -459,18 +458,18 @@ def test_materialized_view(ctx_query_and_df: TestContext):
459458
# Make sure that dropping a materialized view also works
460459
if ctx.engine_adapter.dialect == "doris":
461460
# Wait for the materialized view to be created by retrying drop until it succeeds
462-
for attempt in Retrying(
463-
stop=stop_after_delay(5),
464-
wait=wait_fixed(1),
465-
retry=retry_if_exception_type(Exception),
466-
reraise=True,
467-
):
468-
with attempt:
461+
def drop_view_success():
462+
try:
469463
ctx.engine_adapter.drop_view(
470464
view,
471465
materialized=True,
472466
view_properties={"materialized_type": "SYNC", "source_table": source_table},
473467
)
468+
return True
469+
except Exception:
470+
return False
471+
472+
wait_until(drop_view_success, attempts=5, wait=1)
474473
else:
475474
ctx.engine_adapter.drop_view(view, materialized=True)
476475
results = ctx.get_metadata_results()
@@ -813,37 +812,37 @@ def test_insert_overwrite_by_time_partition(ctx_query_and_df: TestContext):
813812
if ctx.dialect == "tsql":
814813
ds_type = "varchar(max)"
815814
if ctx.dialect == "doris":
816-
ds_type = "date"
815+
ds_type = "datetime"
817816

818-
# Get current year and create dates for testing. Doris cannot have more than 500 history partitions.
819-
current_year = datetime.now().year
820-
current_date = datetime(current_year, 1, 1)
821-
if ctx.dialect == "doris":
822-
# For Doris with DATE type, use pandas date objects
823-
date_1 = current_date.date()
824-
date_2 = (current_date + timedelta(days=1)).date()
825-
date_3 = (current_date + timedelta(days=2)).date()
826-
date_4 = (current_date + timedelta(days=3)).date()
827-
date_5 = (current_date + timedelta(days=4)).date()
828-
else:
829-
date_1 = current_date.strftime("%Y-%m-%d")
830-
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
831-
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
832-
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
833-
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
817+
# Get current create date for testing.
818+
current_date = datetime.now()
819+
date_1 = current_date.strftime("%Y-%m-%d")
820+
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
821+
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
822+
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
823+
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
824+
date_6 = (current_date + timedelta(days=5)).strftime("%Y-%m-%d")
834825

835826
ctx.columns_to_types = {"id": "int", "ds": ds_type}
836827
table = ctx.table("test_table")
837828
if ctx.dialect == "bigquery":
838829
partitioned_by = ["DATE(ds)"]
839830
else:
840831
partitioned_by = ctx.partitioned_by # type: ignore
832+
if ctx.dialect == "doris":
833+
table_properties = {
834+
"partitions": f"FROM ('{date_1}') TO ('{date_6}') INTERVAL 1 DAY",
835+
}
836+
else:
837+
table_properties = {}
838+
841839
ctx.engine_adapter.create_table(
842840
table,
843841
ctx.columns_to_types,
844842
partitioned_by=partitioned_by,
845843
partition_interval_unit="DAY",
846844
table_format=ctx.default_table_format,
845+
table_properties=table_properties,
847846
)
848847
input_data = pd.DataFrame(
849848
[
@@ -922,24 +921,16 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes
922921
if ctx.dialect == "tsql":
923922
ds_type = "varchar(max)"
924923
if ctx.dialect == "doris":
925-
ds_type = "date"
924+
ds_type = "datetime"
926925

927-
# Get current year and create dates for testing. Doris cannot have more than 500 history partitions.
928-
current_year = datetime.now().year
929-
current_date = datetime(current_year, 1, 1)
930-
if ctx.dialect == "doris":
931-
# For Doris with DATE type, use pandas date objects
932-
date_1 = current_date.date()
933-
date_2 = (current_date + timedelta(days=1)).date()
934-
date_3 = (current_date + timedelta(days=2)).date()
935-
date_4 = (current_date + timedelta(days=3)).date()
936-
date_5 = (current_date + timedelta(days=4)).date()
937-
else:
938-
date_1 = current_date.strftime("%Y-%m-%d")
939-
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
940-
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
941-
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
942-
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
926+
# Get current create date for testing.
927+
current_date = datetime.now()
928+
date_1 = current_date.strftime("%Y-%m-%d")
929+
date_2 = (current_date + timedelta(days=1)).strftime("%Y-%m-%d")
930+
date_3 = (current_date + timedelta(days=2)).strftime("%Y-%m-%d")
931+
date_4 = (current_date + timedelta(days=3)).strftime("%Y-%m-%d")
932+
date_5 = (current_date + timedelta(days=4)).strftime("%Y-%m-%d")
933+
date_6 = (current_date + timedelta(days=5)).strftime("%Y-%m-%d")
943934

944935
ctx.columns_to_types = {"id": "int", "ds": ds_type}
945936
columns_to_types = {
@@ -952,12 +943,20 @@ def test_insert_overwrite_by_time_partition_source_columns(ctx_query_and_df: Tes
952943
partitioned_by = ["DATE(ds)"]
953944
else:
954945
partitioned_by = ctx.partitioned_by # type: ignore
946+
if ctx.dialect == "doris":
947+
table_properties = {
948+
"partitions": f"FROM ('{date_1}') TO ('{date_6}') INTERVAL 1 DAY",
949+
}
950+
else:
951+
table_properties = {}
952+
955953
ctx.engine_adapter.create_table(
956954
table,
957955
columns_to_types,
958956
partitioned_by=partitioned_by,
959957
partition_interval_unit="DAY",
960958
table_format=ctx.default_table_format,
959+
table_properties=table_properties,
961960
)
962961
input_data = pd.DataFrame(
963962
[
@@ -2181,6 +2180,18 @@ def _mutate_config(gateway: str, config: Config) -> None:
21812180
)
21822181
context._models.update({model_key: model})
21832182

2183+
# Doris requires partitions to be set in physical_properties for INCREMENTAL_BY_TIME_RANGE models
2184+
if ctx.dialect == "doris":
2185+
for model_key, model in context._models.items():
2186+
if model.kind.name == "INCREMENTAL_BY_TIME_RANGE":
2187+
end_plus_1day = to_date(end + timedelta(days=1))
2188+
partitions = f"FROM ('{start.strftime('%Y-%m-%d')}') TO ('{end_plus_1day.strftime('%Y-%m-%d')}') INTERVAL 1 DAY"
2189+
2190+
model_physical_props = model.copy(
2191+
update={"physical_properties": {"partitions": partitions}}
2192+
)
2193+
context._models.update({model_key: model_physical_props})
2194+
21842195
plan: Plan = context.plan(
21852196
environment="test_prod",
21862197
start=start,
@@ -4086,6 +4097,11 @@ def test_unicode_characters(ctx: TestContext, tmp_path: Path):
40864097
# I also think Spark may not support unicode in general but that would need to be verified.
40874098
if not ctx.engine_adapter.QUOTE_IDENTIFIERS_IN_VIEWS:
40884099
pytest.skip("Skipping as these engines have issues with unicode characters in model names")
4100+
# Doris default setting `enable_unicode_name_support=false` so it is incompatible with unicode characters in model names
4101+
if ctx.dialect == "doris":
4102+
pytest.skip(
4103+
"Skipping as Doris default setting has issues with unicode characters in model names"
4104+
)
40894105

40904106
model_name = "客户数据"
40914107
table = ctx.table(model_name).sql(dialect=ctx.dialect)

0 commit comments

Comments
 (0)