|
6 | 6 | from onetl.connection import Clickhouse |
7 | 7 | from onetl.db import DBReader |
8 | 8 | from pyspark.sql import DataFrame |
9 | | -from pyspark.sql.functions import col, date_trunc |
10 | 9 | from sqlalchemy.ext.asyncio import AsyncSession |
11 | 10 |
|
12 | 11 | from syncmaster.db.models import Connection, Group, Queue, Status, Transfer |
@@ -117,8 +116,6 @@ async def test_run_transfer_postgres_to_clickhouse( |
117 | 116 | table=f"{clickhouse.user}.target_table", |
118 | 117 | ) |
119 | 118 | df = reader.run() |
120 | | - # as spark truncates milliseconds while writing to clickhouse: https://onetl.readthedocs.io/en/latest/connection/db_connection/clickhouse/types.html#id10 |
121 | | - init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) |
122 | 119 | for field in init_df.schema: |
123 | 120 | df = df.withColumn(field.name, df[field.name].cast(field.dataType)) |
124 | 121 |
|
@@ -169,11 +166,6 @@ async def test_run_transfer_postgres_to_clickhouse_mixed_naming( |
169 | 166 | assert df.columns != init_df_with_mixed_column_naming.columns |
170 | 167 | assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] |
171 | 168 |
|
172 | | - # as spark truncates milliseconds while writing to clickhouse: https://onetl.readthedocs.io/en/latest/connection/db_connection/clickhouse/types.html#id10 |
173 | | - init_df_with_mixed_column_naming = init_df_with_mixed_column_naming.withColumn( |
174 | | - "Registered At", |
175 | | - date_trunc("second", col("Registered At")), |
176 | | - ) |
177 | 169 | for field in init_df_with_mixed_column_naming.schema: |
178 | 170 | df = df.withColumn(field.name, df[field.name].cast(field.dataType)) |
179 | 171 |
|
@@ -222,8 +214,6 @@ async def test_run_transfer_clickhouse_to_postgres( |
222 | 214 | ) |
223 | 215 | df = reader.run() |
224 | 216 |
|
225 | | - # as spark truncates milliseconds while writing to clickhouse: https://onetl.readthedocs.io/en/latest/connection/db_connection/clickhouse/types.html#id10 |
226 | | - init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT"))) |
227 | 217 | for field in init_df.schema: |
228 | 218 | df = df.withColumn(field.name, df[field.name].cast(field.dataType)) |
229 | 219 |
|
@@ -275,11 +265,6 @@ async def test_run_transfer_clickhouse_to_postgres_mixed_naming( |
275 | 265 | assert df.columns != init_df_with_mixed_column_naming.columns |
276 | 266 | assert df.columns == [column.lower() for column in init_df_with_mixed_column_naming.columns] |
277 | 267 |
|
278 | | - # as spark truncates milliseconds while writing to clickhouse: https://onetl.readthedocs.io/en/latest/connection/db_connection/clickhouse/types.html#id10 |
279 | | - init_df_with_mixed_column_naming = init_df_with_mixed_column_naming.withColumn( |
280 | | - "Registered At", |
281 | | - date_trunc("second", col("Registered At")), |
282 | | - ) |
283 | 268 | for field in init_df_with_mixed_column_naming.schema: |
284 | 269 | df = df.withColumn(field.name, df[field.name].cast(field.dataType)) |
285 | 270 |
|
|
0 commit comments