66from onetl .connection import MSSQL
77from onetl .db import DBReader
88from pyspark .sql import DataFrame
9+ from pyspark .sql .functions import col , date_trunc
910from sqlalchemy .ext .asyncio import AsyncSession
1011
1112from syncmaster .db .models import Connection , Group , Queue , Status , Transfer
@@ -37,7 +38,7 @@ async def postgres_to_mssql(
3738 },
3839 target_params = {
3940 "type" : "mssql" ,
40- "table_name" : f" { mssql_for_conftest . user } .target_table" ,
41+ "table_name" : "dbo .target_table" ,
4142 },
4243 queue_id = queue .id ,
4344 )
@@ -63,7 +64,7 @@ async def mssql_to_postgres(
6364 target_connection_id = postgres_connection .id ,
6465 source_params = {
6566 "type" : "mssql" ,
66- "table_name" : f" { mssql_for_conftest . user } .source_table" ,
67+ "table_name" : "dbo .source_table" ,
6768 },
6869 target_params = {
6970 "type" : "postgres" ,
@@ -113,10 +114,14 @@ async def test_run_transfer_postgres_to_mssql(
113114 assert "password" not in target_auth_data
114115 reader = DBReader (
115116 connection = mssql ,
116- table = f" { mssql . user } .target_table" ,
117+ table = "dbo .target_table" ,
117118 )
118119 df = reader .run ()
119120
121+ # as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
122+ df = df .withColumn ("REGISTERED_AT" , date_trunc ("second" , col ("REGISTERED_AT" )))
123+ init_df = init_df .withColumn ("REGISTERED_AT" , date_trunc ("second" , col ("REGISTERED_AT" )))
124+
120125 for field in init_df .schema :
121126 df = df .withColumn (field .name , df [field .name ].cast (field .dataType ))
122127
@@ -161,11 +166,19 @@ async def test_run_transfer_postgres_to_mssql_mixed_naming(
161166
162167 reader = DBReader (
163168 connection = mssql ,
164- table = f"{ mssql . user } .target_table" ,
169+ table = f"dbo .target_table" ,
165170 )
166171 df = reader .run ()
172+
167173 assert df .columns != init_df_with_mixed_column_naming .columns
168- assert df .columns == [column .lower () for column in init_df_with_mixed_column_naming .columns ]
174+ assert df .columns == [column .upper () for column in init_df_with_mixed_column_naming .columns ]
175+
176+ # as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
177+ df = df .withColumn ("Registered At" , date_trunc ("second" , col ("Registered At" )))
178+ init_df_with_mixed_column_naming = init_df_with_mixed_column_naming .withColumn (
179+ "Registered At" ,
180+ date_trunc ("second" , col ("Registered At" )),
181+ )
169182
170183 for field in init_df_with_mixed_column_naming .schema :
171184 df = df .withColumn (field .name , df [field .name ].cast (field .dataType ))
@@ -215,6 +228,10 @@ async def test_run_transfer_mssql_to_postgres(
215228 )
216229 df = reader .run ()
217230
231+ # as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
232+ df = df .withColumn ("REGISTERED_AT" , date_trunc ("second" , col ("REGISTERED_AT" )))
233+ init_df = init_df .withColumn ("REGISTERED_AT" , date_trunc ("second" , col ("REGISTERED_AT" )))
234+
218235 for field in init_df .schema :
219236 df = df .withColumn (field .name , df [field .name ].cast (field .dataType ))
220237
@@ -266,6 +283,13 @@ async def test_run_transfer_mssql_to_postgres_mixed_naming(
266283 assert df .columns != init_df_with_mixed_column_naming .columns
267284 assert df .columns == [column .lower () for column in init_df_with_mixed_column_naming .columns ]
268285
286+ # as spark rounds datetime up to milliseconds while writing to mssql: https://onetl.readthedocs.io/en/latest/connection/db_connection/mssql/types.html#id5
287+ df = df .withColumn ("Registered At" , date_trunc ("second" , col ("Registered At" )))
288+ init_df_with_mixed_column_naming = init_df_with_mixed_column_naming .withColumn (
289+ "Registered At" ,
290+ date_trunc ("second" , col ("Registered At" )),
291+ )
292+
269293 for field in init_df_with_mixed_column_naming .schema :
270294 df = df .withColumn (field .name , df [field .name ].cast (field .dataType ))
271295
0 commit comments