Skip to content

Commit 8615bb9

Browse files
use only first found file for discover
1 parent f0443aa commit 8615bb9

File tree

3 files changed

+31
-0
lines changed

3 files changed

+31
-0
lines changed

airbyte_cdk/sources/file_based/config/file_based_stream_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ class FileBasedStreamConfig(BaseModel):
7474
default=None,
7575
gt=0,
7676
)
77+
use_first_found_file_for_schema_discovery: Optional[bool] = Field(
78+
title="Use first found file for schema discovery",
79+
description="When enable, the source will use the first found file for schema discovery. Helps to avoid long discovery step",
80+
default=False,
81+
)
7782

7883
@validator("input_schema", pre=True)
7984
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,10 @@ def _get_raw_json_schema(self) -> JsonSchema:
273273
return self.config.get_input_schema() # type: ignore
274274
elif self.config.schemaless:
275275
return schemaless_schema
276+
elif self.config.use_first_found_file_for_schema_discovery:
277+
self.logger.info(msg=f"Using only first found file for schema discovery.")
278+
files = [next(iter(self.get_files()))]
279+
first_n_files = len(files)
276280
else:
277281
files = self.list_files()
278282
first_n_files = len(files)

unit_tests/sources/file_based/stream/test_default_file_based_stream.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,28 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
226226
}
227227
assert self._parser.infer_schema.call_count == 3
228228

229+
def test_use_first_found_file_for_schema_discovery(self) -> None:
230+
self._stream.config.use_first_found_file_for_schema_discovery = True
231+
232+
self._discovery_policy.get_max_n_files_for_schema_inference.return_value = 3
233+
self._discovery_policy.n_concurrent_requests = 1
234+
self._stream.config.input_schema = None
235+
self._stream.config.schemaless = None
236+
self._stream.config.recent_n_files_to_read_for_schema_discovery = None
237+
self._parser.infer_schema.return_value = {"data": {"type": "string"}}
238+
files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
239+
self._stream_reader.get_matching_files.return_value = files
240+
241+
schema = self._stream.get_json_schema()
242+
assert schema == {
243+
"properties": {
244+
"_ab_source_file_last_modified": {"type": "string"},
245+
"_ab_source_file_url": {"type": "string"},
246+
"data": {"type": ["null", "string"]},
247+
},
248+
"type": "object",
249+
}
250+
229251
def _iter(self, x: Iterable[Any]) -> Iterator[Any]:
230252
for item in x:
231253
if isinstance(item, Exception):

0 commit comments

Comments
 (0)