Skip to content

Commit 67eaded

Browse files
authored
Fix jdbc logical type not found when python sdk worker running in docker env (#36014)
* Fix jdbc logical type not found when python sdk worker running in docker env. * Add TODO. * Move JdbcTimeType and JdbcDateType to schemas.py. * Fix lint * Revert the previous import hack.
1 parent d28fd64 commit 67eaded

File tree

2 files changed

+94
-90
lines changed

2 files changed

+94
-90
lines changed

sdks/python/apache_beam/io/jdbc.py

Lines changed: 2 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
# pytype: skip-file
8888

8989
import contextlib
90-
import datetime
9190
import typing
9291

9392
import numpy as np
@@ -96,10 +95,11 @@
9695
from apache_beam.transforms.external import BeamJarExpansionService
9796
from apache_beam.transforms.external import ExternalTransform
9897
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
98+
from apache_beam.typehints.schemas import JdbcDateType # pylint: disable=unused-import
99+
from apache_beam.typehints.schemas import JdbcTimeType # pylint: disable=unused-import
99100
from apache_beam.typehints.schemas import LogicalType
100101
from apache_beam.typehints.schemas import MillisInstant
101102
from apache_beam.typehints.schemas import typing_to_runner_api
102-
from apache_beam.utils.timestamp import Timestamp
103103

104104
__all__ = [
105105
'WriteToJdbc',
@@ -399,91 +399,3 @@ def __init__(
399399
),
400400
expansion_service or default_io_expansion_service(classpath),
401401
)
402-
403-
404-
@LogicalType.register_logical_type
405-
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
406-
"""
407-
For internal use only; no backwards-compatibility guarantees.
408-
409-
Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
410-
has been migrated to Beam portable logical types.
411-
"""
412-
def __init__(self, argument=""):
413-
pass
414-
415-
@classmethod
416-
def representation_type(cls) -> type:
417-
return MillisInstant
418-
419-
@classmethod
420-
def urn(cls):
421-
return "beam:logical_type:javasdk_date:v1"
422-
423-
@classmethod
424-
def language_type(cls):
425-
return datetime.date
426-
427-
def to_representation_type(self, value: datetime.date) -> Timestamp:
428-
return Timestamp.from_utc_datetime(
429-
datetime.datetime.combine(
430-
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
431-
432-
def to_language_type(self, value: Timestamp) -> datetime.date:
433-
return value.to_utc_datetime().date()
434-
435-
@classmethod
436-
def argument_type(cls):
437-
return str
438-
439-
def argument(self):
440-
return ""
441-
442-
@classmethod
443-
def _from_typing(cls, typ):
444-
return cls()
445-
446-
447-
@LogicalType.register_logical_type
448-
class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
449-
"""
450-
For internal use only; no backwards-compatibility guarantees.
451-
452-
Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
453-
JDBCIO has been migrated to Beam portable logical types.
454-
"""
455-
def __init__(self, argument=""):
456-
pass
457-
458-
@classmethod
459-
def representation_type(cls) -> type:
460-
return MillisInstant
461-
462-
@classmethod
463-
def urn(cls):
464-
return "beam:logical_type:javasdk_time:v1"
465-
466-
@classmethod
467-
def language_type(cls):
468-
return datetime.time
469-
470-
def to_representation_type(self, value: datetime.date) -> Timestamp:
471-
return Timestamp.from_utc_datetime(
472-
datetime.datetime.combine(
473-
datetime.datetime.utcfromtimestamp(0),
474-
value,
475-
tzinfo=datetime.timezone.utc))
476-
477-
def to_language_type(self, value: Timestamp) -> datetime.date:
478-
return value.to_utc_datetime().time()
479-
480-
@classmethod
481-
def argument_type(cls):
482-
return str
483-
484-
def argument(self):
485-
return ""
486-
487-
@classmethod
488-
def _from_typing(cls, typ):
489-
return cls()

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
# pytype: skip-file
6868

69+
import datetime
6970
import decimal
7071
import logging
7172
from typing import Any
@@ -1189,3 +1190,94 @@ def argument_type(cls):
11891190

11901191
def argument(self):
11911192
return self.max_length
1193+
1194+
1195+
# TODO: A temporary fix for missing jdbc logical types.
1196+
# See the discussion in https://github.com/apache/beam/issues/35738 for
1197+
# more detail.
1198+
@LogicalType.register_logical_type
1199+
class JdbcDateType(LogicalType[datetime.date, MillisInstant, str]):
1200+
"""
1201+
For internal use only; no backwards-compatibility guarantees.
1202+
1203+
Support of Legacy JdbcIO DATE logical type. Deemed to change when Java JDBCIO
1204+
has been migrated to Beam portable logical types.
1205+
"""
1206+
def __init__(self, argument=""):
1207+
pass
1208+
1209+
@classmethod
1210+
def representation_type(cls) -> type:
1211+
return MillisInstant
1212+
1213+
@classmethod
1214+
def urn(cls):
1215+
return "beam:logical_type:javasdk_date:v1"
1216+
1217+
@classmethod
1218+
def language_type(cls):
1219+
return datetime.date
1220+
1221+
def to_representation_type(self, value: datetime.date) -> Timestamp:
1222+
return Timestamp.from_utc_datetime(
1223+
datetime.datetime.combine(
1224+
value, datetime.datetime.min.time(), tzinfo=datetime.timezone.utc))
1225+
1226+
def to_language_type(self, value: Timestamp) -> datetime.date:
1227+
return value.to_utc_datetime().date()
1228+
1229+
@classmethod
1230+
def argument_type(cls):
1231+
return str
1232+
1233+
def argument(self):
1234+
return ""
1235+
1236+
@classmethod
1237+
def _from_typing(cls, typ):
1238+
return cls()
1239+
1240+
1241+
@LogicalType.register_logical_type
1242+
class JdbcTimeType(LogicalType[datetime.time, MillisInstant, str]):
1243+
"""
1244+
For internal use only; no backwards-compatibility guarantees.
1245+
1246+
Support of Legacy JdbcIO TIME logical type. . Deemed to change when Java
1247+
JDBCIO has been migrated to Beam portable logical types.
1248+
"""
1249+
def __init__(self, argument=""):
1250+
pass
1251+
1252+
@classmethod
1253+
def representation_type(cls) -> type:
1254+
return MillisInstant
1255+
1256+
@classmethod
1257+
def urn(cls):
1258+
return "beam:logical_type:javasdk_time:v1"
1259+
1260+
@classmethod
1261+
def language_type(cls):
1262+
return datetime.time
1263+
1264+
def to_representation_type(self, value: datetime.time) -> Timestamp:
1265+
return Timestamp.from_utc_datetime(
1266+
datetime.datetime.combine(
1267+
datetime.datetime.utcfromtimestamp(0),
1268+
value,
1269+
tzinfo=datetime.timezone.utc))
1270+
1271+
def to_language_type(self, value: Timestamp) -> datetime.time:
1272+
return value.to_utc_datetime().time()
1273+
1274+
@classmethod
1275+
def argument_type(cls):
1276+
return str
1277+
1278+
def argument(self):
1279+
return ""
1280+
1281+
@classmethod
1282+
def _from_typing(cls, typ):
1283+
return cls()

0 commit comments

Comments
 (0)