Skip to content

Commit 51591b9

Browse files
author
Ilyas Gasanov
committed
[DOP-19901] Add integration MSSQL tests & CI
1 parent 794f956 commit 51591b9

File tree

4 files changed

+38
-14
lines changed

4 files changed

+38
-14
lines changed

docker-compose.test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ services:
9595
condition: service_healthy
9696
rabbitmq:
9797
condition: service_healthy
98-
profiles: [worker, scheduler, s3, oracle, hdfs, hive, all, clickhouse, mssql]
98+
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mssql, all]
9999

100100
test-postgres:
101101
image: postgres

syncmaster/worker/handlers/db/mssql.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ def connect(self, spark: SparkSession):
3434

3535
def normalize_column_names(self, df: DataFrame) -> DataFrame:
3636
for column_name in df.columns:
37-
df = df.withColumnRenamed(column_name, column_name.upper())
37+
df = df.withColumnRenamed(column_name, column_name.lower())
3838
return df

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -637,31 +637,31 @@ def prepare_mssql(
637637
spark=spark,
638638
).check()
639639
try:
640-
onetl_conn.execute(f"DROP TABLE {mssql.user}.source_table")
640+
onetl_conn.execute(f"DROP TABLE dbo.source_table")
641641
except Exception:
642642
pass
643643
try:
644-
onetl_conn.execute(f"DROP TABLE {mssql.user}.target_table")
644+
onetl_conn.execute(f"DROP TABLE dbo.target_table")
645645
except Exception:
646646
pass
647647

648648
def fill_with_data(df: DataFrame):
649-
logger.info("START PREPARE ORACLE")
649+
logger.info("START PREPARE MSSQL")
650650
db_writer = DBWriter(
651651
connection=onetl_conn,
652-
target=f"{mssql.user}.source_table",
652+
target="dbo.source_table",
653653
)
654654
db_writer.run(df)
655-
logger.info("END PREPARE ORACLE")
655+
logger.info("END PREPARE MSSQL")
656656

657657
yield onetl_conn, fill_with_data
658658

659659
try:
660-
onetl_conn.execute(f"DROP TABLE {mssql.user}.source_table")
660+
onetl_conn.execute(f"DROP TABLE dbo.source_table")
661661
except Exception:
662662
pass
663663
try:
664-
onetl_conn.execute(f"DROP TABLE {mssql.user}.target_table")
664+
onetl_conn.execute(f"DROP TABLE dbo.target_table")
665665
except Exception:
666666
pass
667667

tests/test_integration/test_run_transfer/test_mssql.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from onetl.connection import MSSQL
77
from onetl.db import DBReader
88
from pyspark.sql import DataFrame
9+
from pyspark.sql.functions import col, date_trunc
910
from sqlalchemy.ext.asyncio import AsyncSession
1011

1112
from syncmaster.db.models import Connection, Group, Queue, Status, Transfer
@@ -37,7 +38,7 @@ async def postgres_to_mssql(
3738
},
3839
target_params={
3940
"type": "mssql",
40-
"table_name": f"{mssql_for_conftest.user}.target_table",
41+
"table_name": "dbo.target_table",
4142
},
4243
queue_id=queue.id,
4344
)
@@ -63,7 +64,7 @@ async def mssql_to_postgres(
6364
target_connection_id=postgres_connection.id,
6465
source_params={
6566
"type": "mssql",
66-
"table_name": f"{mssql_for_conftest.user}.source_table",
67+
"table_name": "dbo.source_table",
6768
},
6869
target_params={
6970
"type": "postgres",
@@ -113,10 +114,14 @@ async def test_run_transfer_postgres_to_mssql(
113114
assert "password" not in target_auth_data
114115
reader = DBReader(
115116
connection=mssql,
116-
table=f"{mssql.user}.target_table",
117+
table="dbo.target_table",
117118
)
118119
df = reader.run()
119120

121+
# as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
122+
df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
123+
init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
124+
120125
for field in init_df.schema:
121126
df = df.withColumn(field.name, df[field.name].cast(field.dataType))
122127

@@ -161,11 +166,19 @@ async def test_run_transfer_postgres_to_mssql_mixed_naming(
161166

162167
reader = DBReader(
163168
connection=mssql,
164-
table=f"{mssql.user}.target_table",
169+
table=f"dbo.target_table",
165170
)
166171
df = reader.run()
172+
167173
assert df.columns != init_df_with_mixed_column_naming.columns
168-
assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns]
174+
assert df.columns == [column.upper() for column in init_df_with_mixed_column_naming.columns]
175+
176+
# as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
177+
df = df.withColumn("Registered At", date_trunc("second", col("Registered At")))
178+
init_df_with_mixed_column_naming = init_df_with_mixed_column_naming.withColumn(
179+
"Registered At",
180+
date_trunc("second", col("Registered At")),
181+
)
169182

170183
for field in init_df_with_mixed_column_naming.schema:
171184
df = df.withColumn(field.name, df[field.name].cast(field.dataType))
@@ -215,6 +228,10 @@ async def test_run_transfer_mssql_to_postgres(
215228
)
216229
df = reader.run()
217230

231+
# as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
232+
df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
233+
init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
234+
218235
for field in init_df.schema:
219236
df = df.withColumn(field.name, df[field.name].cast(field.dataType))
220237

@@ -266,6 +283,13 @@ async def test_run_transfer_mssql_to_postgres_mixed_naming(
266283
assert df.columns != init_df_with_mixed_column_naming.columns
267284
assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns]
268285

286+
# as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
287+
df = df.withColumn("Registered At", date_trunc("second", col("Registered At")))
288+
init_df_with_mixed_column_naming = init_df_with_mixed_column_naming.withColumn(
289+
"Registered At",
290+
date_trunc("second", col("Registered At")),
291+
)
292+
269293
for field in init_df_with_mixed_column_naming.schema:
270294
df = df.withColumn(field.name, df[field.name].cast(field.dataType))
271295

0 commit comments

Comments
 (0)