|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
8 | 8 | import datetime |
| 9 | +import functools |
9 | 10 | import json |
10 | 11 | import select |
11 | 12 | import typing as t |
|
16 | 17 | import psycopg2 |
17 | 18 | import singer_sdk.helpers._typing |
18 | 19 | import sqlalchemy as sa |
| 20 | +import sqlalchemy.types |
19 | 21 | from psycopg2 import extras |
20 | 22 | from singer_sdk import SQLConnector, SQLStream |
21 | | -from singer_sdk import typing as th |
| 23 | +from singer_sdk.connectors.sql import SQLToJSONSchema |
22 | 24 | from singer_sdk.helpers._state import increment_state |
23 | 25 | from singer_sdk.helpers._typing import TypeConformanceLevel |
24 | 26 | from singer_sdk.streams.core import REPLICATION_INCREMENTAL |
| 27 | +from sqlalchemy.dialects import postgresql |
25 | 28 |
|
26 | 29 | if TYPE_CHECKING: |
27 | 30 | from singer_sdk.helpers.types import Context |
28 | 31 | from sqlalchemy.dialects import postgresql |
29 | 32 | from sqlalchemy.engine import Engine |
30 | 33 | from sqlalchemy.engine.reflection import Inspector |
31 | | - from sqlalchemy.types import TypeEngine |
| 34 | + |
| 35 | + |
| 36 | +class PostgresSQLToJSONSchema(SQLToJSONSchema): |
| 37 | + """Custom SQL to JSON Schema conversion for Postgres.""" |
| 38 | + |
| 39 | + def __init__(self, dates_as_string: bool, *args, **kwargs): |
| 40 | + """Initialize the SQL to JSON Schema converter.""" |
| 41 | + super().__init__(*args, **kwargs) |
| 42 | + self.dates_as_string = dates_as_string |
| 43 | + |
| 44 | + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] |
| 45 | + def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict: |
| 46 | + """Override the default mapping for NUMERIC columns. |
| 47 | +
|
| 48 | + For example, a scale of 4 translates to a multipleOf 0.0001. |
| 49 | + """ |
| 50 | + return { |
| 51 | + "type": "array", |
| 52 | + "items": self.to_jsonschema(column_type.item_type), |
| 53 | + } |
| 54 | + |
| 55 | + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] |
| 56 | + def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict: |
| 57 | + """Override the default mapping for JSON and JSONB columns.""" |
| 58 | + return {"type": ["string", "number", "integer", "array", "object", "boolean"]} |
| 59 | + |
| 60 | + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] |
| 61 | + def datetime_to_jsonschema(self, column_type: sqlalchemy.types.DateTime) -> dict: |
| 62 | + """Override the default mapping for DATETIME columns.""" |
| 63 | + if self.dates_as_string: |
| 64 | + return {"type": ["string", "null"]} |
| 65 | + return super().datetime_to_jsonschema(column_type) |
| 66 | + |
| 67 | + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] |
| 68 | + def date_to_jsonschema(self, column_type: sqlalchemy.types.Date) -> dict: |
| 69 | + """Override the default mapping for DATE columns.""" |
| 70 | + if self.dates_as_string: |
| 71 | + return {"type": ["string", "null"]} |
| 72 | + return super().date_to_jsonschema(column_type) |
32 | 73 |
|
33 | 74 |
|
34 | 75 | def patched_conform( |
@@ -115,131 +156,10 @@ def __init__( |
115 | 156 |
|
116 | 157 | super().__init__(config=config, sqlalchemy_url=sqlalchemy_url) |
117 | 158 |
|
118 | | - # Note super is static, we can get away with this because this is called once |
119 | | - # and is luckily referenced via the instance of the class |
120 | | - def to_jsonschema_type( # type: ignore[override] |
121 | | - self, |
122 | | - sql_type: str | TypeEngine | type[TypeEngine] | postgresql.ARRAY | Any, |
123 | | - ) -> dict: |
124 | | - """Return a JSON Schema representation of the provided type. |
125 | | -
|
126 | | - Overridden from SQLConnector to correctly handle JSONB and Arrays. |
127 | | -
|
128 | | - Also Overridden in order to call our instance method `sdk_typing_object()` |
129 | | - instead of the static version |
130 | | -
|
131 | | - By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy |
132 | | - types. |
133 | | -
|
134 | | - Args: |
135 | | - sql_type: The string representation of the SQL type, a SQLAlchemy |
136 | | - TypeEngine class or object, or a custom-specified object. |
137 | | -
|
138 | | - Raises: |
139 | | - ValueError: If the type received could not be translated to jsonschema. |
140 | | -
|
141 | | - Returns: |
142 | | - The JSON Schema representation of the provided type. |
143 | | -
|
144 | | - """ |
145 | | - type_name = None |
146 | | - if isinstance(sql_type, str): |
147 | | - type_name = sql_type |
148 | | - elif isinstance(sql_type, sa.types.TypeEngine): |
149 | | - type_name = type(sql_type).__name__ |
150 | | - |
151 | | - if ( |
152 | | - type_name is not None |
153 | | - and isinstance(sql_type, sa.dialects.postgresql.ARRAY) |
154 | | - and type_name == "ARRAY" |
155 | | - ): |
156 | | - array_type = self.sdk_typing_object(sql_type.item_type) |
157 | | - return th.ArrayType(array_type).type_dict |
158 | | - return self.sdk_typing_object(sql_type).type_dict |
159 | | - |
160 | | - def sdk_typing_object( |
161 | | - self, |
162 | | - from_type: str | TypeEngine | type[TypeEngine], |
163 | | - ) -> ( |
164 | | - th.DateTimeType |
165 | | - | th.NumberType |
166 | | - | th.IntegerType |
167 | | - | th.DateType |
168 | | - | th.StringType |
169 | | - | th.BooleanType |
170 | | - | th.CustomType |
171 | | - ): |
172 | | - """Return the JSON Schema dict that describes the sql type. |
173 | | -
|
174 | | - Args: |
175 | | - from_type: The SQL type as a string or as a TypeEngine. If a TypeEngine is |
176 | | - provided, it may be provided as a class or a specific object instance. |
177 | | -
|
178 | | - Raises: |
179 | | - ValueError: If the `from_type` value is not of type `str` or `TypeEngine`. |
180 | | -
|
181 | | - Returns: |
182 | | - A compatible JSON Schema type definition. |
183 | | - """ |
184 | | - # NOTE: This is an ordered mapping, with earlier mappings taking precedence. If |
185 | | - # the SQL-provided type contains the type name on the left, the mapping will |
186 | | - # return the respective singer type. |
187 | | - # NOTE: jsonb and json should theoretically be th.AnyType().type_dict but that |
188 | | - # causes problems down the line with an error like: |
189 | | - # singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from |
190 | | - # empty type_dict. Did you forget to define a property in the stream schema? |
191 | | - sqltype_lookup: dict[ |
192 | | - str, |
193 | | - th.DateTimeType |
194 | | - | th.NumberType |
195 | | - | th.IntegerType |
196 | | - | th.DateType |
197 | | - | th.StringType |
198 | | - | th.BooleanType |
199 | | - | th.CustomType, |
200 | | - ] = { |
201 | | - "jsonb": th.CustomType( |
202 | | - {"type": ["string", "number", "integer", "array", "object", "boolean"]} |
203 | | - ), |
204 | | - "json": th.CustomType( |
205 | | - {"type": ["string", "number", "integer", "array", "object", "boolean"]} |
206 | | - ), |
207 | | - "timestamp": th.DateTimeType(), |
208 | | - "datetime": th.DateTimeType(), |
209 | | - "date": th.DateType(), |
210 | | - "int": th.IntegerType(), |
211 | | - "numeric": th.NumberType(), |
212 | | - "decimal": th.NumberType(), |
213 | | - "double": th.NumberType(), |
214 | | - "float": th.NumberType(), |
215 | | - "real": th.NumberType(), |
216 | | - "float4": th.NumberType(), |
217 | | - "string": th.StringType(), |
218 | | - "text": th.StringType(), |
219 | | - "char": th.StringType(), |
220 | | - "bool": th.BooleanType(), |
221 | | - "variant": th.StringType(), |
222 | | - } |
223 | | - if self.config["dates_as_string"] is True: |
224 | | - sqltype_lookup["date"] = th.StringType() |
225 | | - sqltype_lookup["datetime"] = th.StringType() |
226 | | - if isinstance(from_type, str): |
227 | | - type_name = from_type |
228 | | - elif isinstance(from_type, sa.types.TypeEngine): |
229 | | - type_name = type(from_type).__name__ |
230 | | - elif isinstance(from_type, type) and issubclass(from_type, sa.types.TypeEngine): |
231 | | - type_name = from_type.__name__ |
232 | | - else: |
233 | | - raise ValueError( |
234 | | - "Expected `str` or a SQLAlchemy `TypeEngine` object or type." |
235 | | - ) |
236 | | - |
237 | | - # Look for the type name within the known SQL type names: |
238 | | - for sqltype, jsonschema_type in sqltype_lookup.items(): |
239 | | - if sqltype.lower() in type_name.lower(): |
240 | | - return jsonschema_type |
241 | | - |
242 | | - return sqltype_lookup["string"] # safe failover to str |
| 159 | + @functools.cached_property |
| 160 | + def sql_to_jsonschema(self): |
| 161 | + """Return a mapping of SQL types to JSON Schema types.""" |
| 162 | + return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"]) |
243 | 163 |
|
244 | 164 | def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: |
245 | 165 | """Return a list of schema names in DB, or overrides with user-provided values. |
|
0 commit comments