Skip to content

Commit 0a54268

Browse files
feat(taps): In OpenAPI schema normalization, treat all fields as nullable unless they are part of a primary key (#3369)
## Summary by Sourcery Treat all fields as nullable by default in OpenAPI schema normalization, marking only primary key properties as non-nullable by passing key_properties through preprocessing and normalization flows New Features: - Add key_properties parameter to schema preprocessing and normalization methods to distinguish primary keys Enhancements: - Propagate key_properties (stream.primary_keys) through SchemaSource and StreamSchema descriptor to schema preprocessing Tests: - Update tests to expect default nullable types for all fields except primary keys
1 parent 16ea7ad commit 0a54268

File tree

2 files changed

+74
-24
lines changed

2 files changed

+74
-24
lines changed

singer_sdk/schema/source.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from typing_extensions import TypeVar
3535

3636
if t.TYPE_CHECKING:
37-
from collections.abc import Callable
37+
from collections.abc import Callable, Sequence
3838

3939
from singer_sdk.helpers._compat import Traversable
4040
from singer_sdk.streams.core import Stream
@@ -64,11 +64,17 @@ class SchemaPreprocessor(t.Protocol):
6464
Implementations can normalize, validate, or modify schemas as needed.
6565
"""
6666

67-
def preprocess_schema(self, schema: Schema) -> Schema:
67+
def preprocess_schema(
68+
self,
69+
schema: Schema,
70+
*,
71+
key_properties: Sequence[str] = (),
72+
) -> Schema:
6873
"""Pre-process a schema.
6974
7075
Args:
7176
schema: A JSON schema to preprocess.
77+
key_properties: The stream's key properties.
7278
7379
Returns:
7480
The preprocessed schema.
@@ -84,25 +90,41 @@ def __init__(self) -> None:
8490
self._preprocessor: SchemaPreprocessor | None = None
8591

8692
@t.final
87-
def preprocess_schema(self, schema: Schema) -> Schema:
93+
def preprocess_schema(
94+
self,
95+
schema: Schema,
96+
*,
97+
key_properties: Sequence[str] = (),
98+
) -> Schema:
8899
"""Pre-process the schema before providing it to the stream.
89100
90101
Args:
91102
schema: A JSON schema.
103+
key_properties: The stream's key properties.
92104
93105
Returns:
94106
The pre-processed schema.
95107
"""
96108
if self._preprocessor is None:
97109
return schema
98-
return self._preprocessor.preprocess_schema(schema)
110+
return self._preprocessor.preprocess_schema(
111+
schema,
112+
key_properties=key_properties,
113+
)
99114

100115
@t.final
101-
def get_schema(self, key: _TKey, /) -> Schema:
116+
def get_schema(
117+
self,
118+
key: _TKey,
119+
/,
120+
*,
121+
key_properties: Sequence[str] = (),
122+
) -> Schema:
102123
"""Convenience method to get a schema component.
103124
104125
Args:
105126
key: The schema component name to retrieve.
127+
key_properties: The stream's key properties.
106128
107129
Returns:
108130
A JSON schema dictionary.
@@ -122,7 +144,10 @@ def get_schema(self, key: _TKey, /) -> Schema:
122144
raise SchemaNotValidError(msg)
123145
self._schema_cache[key] = schema
124146

125-
return self.preprocess_schema(self._schema_cache[key])
147+
return self.preprocess_schema(
148+
self._schema_cache[key],
149+
key_properties=key_properties,
150+
)
126151

127152
@abstractmethod
128153
def fetch_schema(self, key: _TKey) -> Schema:
@@ -203,7 +228,10 @@ def get_stream_schema(self, stream: Stream, stream_class: type[Stream]) -> Schem
203228
Returns:
204229
A JSON schema dictionary.
205230
"""
206-
return self.schema_source.get_schema(self.key or stream.name) # type: ignore[arg-type]
231+
return self.schema_source.get_schema(
232+
self.key or stream.name, # type: ignore[arg-type]
233+
key_properties=stream.primary_keys,
234+
)
207235

208236

209237
def _load_yaml(content: bytes) -> dict[str, t.Any]:
@@ -223,16 +251,24 @@ class OpenAPISchemaNormalizer(SchemaPreprocessor):
223251
- Recursively processes nested object properties and array items
224252
"""
225253

226-
def handle_object(self, schema: Schema) -> Schema:
254+
def handle_object(
255+
self,
256+
schema: Schema,
257+
*,
258+
key_properties: Sequence[str] = (),
259+
) -> Schema:
227260
"""Handle JSON object schemas.
228261
229262
Args:
230263
schema: A JSON schema.
264+
key_properties: The stream's key properties.
231265
232266
Returns:
233267
The processed object schema.
234268
"""
269+
schema["nullable"] = len(key_properties) == 0
235270
for prop, prop_schema in schema.get("properties", {}).items():
271+
prop_schema["nullable"] = prop not in key_properties
236272
schema["properties"][prop] = self.normalize_schema(prop_schema)
237273
return schema
238274

@@ -290,14 +326,20 @@ def handle_all_of(self, subschemas: list[Schema]) -> Schema: # noqa: PLR6301
290326

291327
return result
292328

293-
def normalize_schema(self, schema: Schema) -> Schema:
329+
def normalize_schema(
330+
self,
331+
schema: Schema,
332+
*,
333+
key_properties: Sequence[str] = (),
334+
) -> Schema:
294335
"""Normalize an OpenAPI schema to standard JSON Schema.
295336
296337
This method applies a series of transformations to convert OpenAPI-specific
297338
schema features (like 'nullable') into standard JSON Schema constructs.
298339
299340
Args:
300341
schema: The schema to normalize.
342+
key_properties: The stream's key properties.
301343
302344
Returns:
303345
A normalized schema dictionary.
@@ -306,7 +348,7 @@ def normalize_schema(self, schema: Schema) -> Schema:
306348
schema_type: str | list[str] = result.get("type", [])
307349

308350
if "object" in schema_type:
309-
result = self.handle_object(result)
351+
result = self.handle_object(result, key_properties=key_properties)
310352

311353
elif "array" in schema_type and (items := result.get("items")):
312354
result["items"] = self.handle_array_items(items)
@@ -330,8 +372,13 @@ def normalize_schema(self, schema: Schema) -> Schema:
330372
return result
331373

332374
@override
333-
def preprocess_schema(self, schema: Schema) -> Schema:
334-
return self.normalize_schema(schema)
375+
def preprocess_schema(
376+
self,
377+
schema: Schema,
378+
*,
379+
key_properties: Sequence[str] = (),
380+
) -> Schema:
381+
return self.normalize_schema(schema, key_properties=key_properties)
335382

336383

337384
class OpenAPISchema(SchemaSource[_TKey]):

tests/core/schema/test_source.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ def resolved_user_schema() -> dict[str, t.Any]:
4545
"type": "object",
4646
"properties": {
4747
"id": {"type": "string"},
48-
"name": {"type": "string"},
49-
"email": {"type": "string", "format": "email"},
48+
"name": {"type": ["string", "null"]},
49+
"email": {"type": ["string", "null"], "format": "email"},
5050
},
5151
"required": ["id", "name"],
5252
}
@@ -410,7 +410,7 @@ def test_openapi_component(
410410
openapi_file.write_text(content)
411411

412412
source = OpenAPISchema(openapi_file)
413-
result = source.get_schema("User")
413+
result = source.get_schema("User", key_properties=("id",))
414414
assert result == resolved_user_schema
415415

416416
def test_openapi_unknown_spec(
@@ -475,7 +475,7 @@ def test_openapi_schema_with_traversable(
475475
path.write_text(json.dumps(openapi_spec))
476476

477477
source = OpenAPISchema(path)
478-
result = source.get_schema("User")
478+
result = source.get_schema("User", key_properties=("id",))
479479
assert result == resolved_user_schema
480480

481481
def test_openapi_schema_with_unknown_file_type(
@@ -568,11 +568,11 @@ def get_unresolved_schema(self, key: PathAndMethodKey) -> dict[str, t.Any]:
568568
expected_schema = {
569569
"type": "array",
570570
"items": {
571-
"type": "object",
571+
"type": ["object", "null"],
572572
"properties": {
573-
"id": {"type": "string"},
574-
"name": {"type": "string"},
575-
"email": {"type": "string", "format": "email"},
573+
"id": {"type": ["string", "null"]},
574+
"name": {"type": ["string", "null"]},
575+
"email": {"type": ["string", "null"], "format": "email"},
576576
},
577577
"required": ["id", "name"],
578578
},
@@ -630,6 +630,7 @@ def test_stream_schema_descriptor(
630630
class FooStream:
631631
name = "foo"
632632
schema: t.ClassVar[StreamSchema] = StreamSchema(schema_source)
633+
primary_keys = ("id",)
633634

634635
stream = FooStream()
635636
assert stream.schema == foo_schema
@@ -644,6 +645,7 @@ def test_stream_schema_descriptor_with_explicit_key(
644645
class BarStream:
645646
name = "bar"
646647
schema: t.ClassVar[StreamSchema] = StreamSchema(schema_source, key="foo")
648+
primary_keys = ("id",)
647649

648650
stream = BarStream()
649651
assert stream.schema == foo_schema
@@ -657,6 +659,7 @@ def test_stream_schema_descriptor_key_not_found(
657659
class BarStream:
658660
name = "bar"
659661
schema: t.ClassVar[StreamSchema] = StreamSchema(schema_source)
662+
primary_keys = ("id",)
660663

661664
stream = BarStream()
662665
with pytest.raises(
@@ -804,14 +807,14 @@ def test_normalize(
804807
schema = source.get_schema("AllOfSchemas")
805808
normalized = source.preprocess_schema(schema)
806809
assert normalized == {
807-
"type": "object",
810+
"type": ["object", "null"],
808811
"properties": {
809812
"references": {
810-
"type": "object",
813+
"type": ["object", "null"],
811814
"additionalProperties": {"type": "string"},
812815
},
813-
"created": {"type": "string"},
814-
"name": {"type": "string"},
816+
"created": {"type": ["string", "null"]},
817+
"name": {"type": ["string", "null"]},
815818
},
816819
}
817820
assert "allOf" not in normalized

0 commit comments

Comments
 (0)