Skip to content

Commit 6b28a5c

Browse files
add schema discovery options validation
1 parent 10a3b4e commit 6b28a5c

File tree

3 files changed

+41
-12
lines changed

3 files changed

+41
-12
lines changed

airbyte_cdk/sources/file_based/config/file_based_stream_config.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from enum import Enum
66
from typing import Any, List, Mapping, Optional, Union
77

8-
from pydantic.v1 import BaseModel, Field, validator
8+
from pydantic.v1 import BaseModel, Field, validator, root_validator
99

1010
from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
1111
from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
@@ -75,7 +75,7 @@ class FileBasedStreamConfig(BaseModel):
7575
gt=0,
7676
)
7777
use_first_found_file_for_schema_discovery: bool = Field(
78-
title="Use first found file for schema discovery",
78+
title="Use First Found File For Schema Discover",
7979
description="When enabled, the source will use the first found file for schema discovery. Helps to avoid long discovery step",
8080
default=False,
8181
)
@@ -89,6 +89,35 @@ def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
8989
raise ConfigValidationError(FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA)
9090
return None
9191

92+
@root_validator
93+
def validate_discovery_related_fields(cls, values):
94+
"""
95+
Please update this validation when new related to schema discovery field is added.
96+
Validates schema discovery options compatability.
97+
Note, that initially the recent_n_files_to_read_for_schema_discovery was added without a validation if schemaless or input_schema were provided.
98+
So this method doesn't check it to do not break already created connections.
99+
If recent_n_files_to_read_for_schema_discovery and schemaless or recent_n_files_to_read_for_schema_discovery and input_schema were provided,
100+
recent_n_files_to_read_for_schema_discovery will be ignored and second option will be used by default.
101+
"""
102+
input_schema = values["input_schema"] is not None
103+
schemaless = values["schemaless"]
104+
recent_n_files_to_read_for_schema_discovery = (
105+
values["recent_n_files_to_read_for_schema_discovery"] is not None
106+
)
107+
use_first_found_file_for_schema_discovery = values[
108+
"use_first_found_file_for_schema_discovery"
109+
]
110+
111+
if (
112+
recent_n_files_to_read_for_schema_discovery
113+
and use_first_found_file_for_schema_discovery
114+
) or [schemaless, input_schema, use_first_found_file_for_schema_discovery].count(True) > 1:
115+
raise ConfigValidationError(
116+
FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS
117+
)
118+
119+
return values
120+
92121
def get_input_schema(self) -> Optional[Mapping[str, Any]]:
93122
"""
94123
User defined input_schema is defined as a string in the config. This method takes the string representation

airbyte_cdk/sources/file_based/exceptions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class FileBasedSourceError(Enum):
2323
"The provided schema could not be transformed into valid JSON Schema."
2424
)
2525
ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
26+
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
2627
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
2728
ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
2829
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,20 @@ def check_connection(
156156
"""
157157
try:
158158
streams = self.streams(config)
159-
except Exception as config_exception:
159+
except ConfigValidationError as config_exception:
160160
raise AirbyteTracedException(
161161
internal_message="Please check the logged errors for more information.",
162-
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
162+
message=str(config_exception),
163163
exception=AirbyteTracedException(exception=config_exception),
164164
failure_type=FailureType.config_error,
165165
)
166+
except Exception as exp:
167+
raise AirbyteTracedException(
168+
internal_message="Please check the logged errors for more information.",
169+
message=FileBasedSourceError.CONFIG_VALIDATION_ERROR.value,
170+
exception=AirbyteTracedException(exception=exp),
171+
failure_type=FailureType.config_error,
172+
)
166173
if len(streams) == 0:
167174
return (
168175
False,
@@ -250,7 +257,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
250257
if (state_manager and catalog_stream)
251258
else None
252259
)
253-
self._validate_input_schema(stream_config)
254260

255261
sync_mode = self._get_sync_mode_from_catalog(stream_config.name)
256262

@@ -457,10 +463,3 @@ def _validate_and_get_validation_policy(
457463
model=FileBasedStreamConfig,
458464
)
459465
return self.validation_policies[stream_config.validation_policy]
460-
461-
def _validate_input_schema(self, stream_config: FileBasedStreamConfig) -> None:
462-
if stream_config.schemaless and stream_config.input_schema:
463-
raise ValidationError(
464-
"`input_schema` and `schemaless` options cannot both be set",
465-
model=FileBasedStreamConfig,
466-
)

0 commit comments

Comments
 (0)