|
36 | 36 | from apache_beam.testing.test_pipeline import TestPipeline |
37 | 37 | from apache_beam.testing.util import assert_that |
38 | 38 | from apache_beam.testing.util import equal_to |
| 39 | +from apache_beam.typehints.schemas import LogicalType |
| 40 | +from apache_beam.typehints.schemas import MillisInstant |
39 | 41 | from apache_beam.utils.timestamp import Timestamp |
40 | 42 |
|
41 | 43 | # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports |
@@ -239,6 +241,10 @@ def test_xlang_jdbc_write_read(self, database): |
239 | 241 |
|
240 | 242 | config = self.jdbc_configs[database] |
241 | 243 |
|
| 244 | + # Register MillisInstant logical type to override the mapping from Timestamp |
| 245 | + # originally handled by MicrosInstant. |
| 246 | + LogicalType.register_logical_type(MillisInstant) |
| 247 | + |
242 | 248 | with TestPipeline() as p: |
243 | 249 | p.not_use_test_runner_api = True |
244 | 250 | _ = ( |
@@ -349,6 +355,10 @@ def custom_row_equals(expected, actual): |
349 | 355 | classpath=config['classpath'], |
350 | 356 | )) |
351 | 357 |
|
| 358 | + # Register MillisInstant logical type to override the mapping from Timestamp |
| 359 | + # originally handled by MicrosInstant. |
| 360 | + LogicalType.register_logical_type(MillisInstant) |
| 361 | + |
352 | 362 | # Run read pipeline with custom schema |
353 | 363 | with TestPipeline() as p: |
354 | 364 | p.not_use_test_runner_api = True |
|
0 commit comments