Skip to content

Commit 38d13d3

Browse files
feat: Add fetch_one/fetch_record functionality to SimpleRetriever
Implements Phase 1 of GitHub issue #833 Changes: - Add fetch_one() method to SimpleRetriever for fetching single records by PK - Add fetch_record() base method to Stream class - Implement fetch_record() in DeclarativeStream to delegate to retriever - Add fetch_record() helper method to AbstractSource - Add comprehensive unit tests for fetch_one functionality The implementation uses convention-based path construction (appending PK value to base path) and supports both simple string PKs and composite dict PKs. Handles 404 responses gracefully by returning None. Co-Authored-By: AJ Steers <[email protected]>
1 parent f0443aa commit 38d13d3

File tree

5 files changed

+388
-0
lines changed

5 files changed

+388
-0
lines changed

airbyte_cdk/legacy/sources/declarative/declarative_stream.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,29 @@ def get_cursor(self) -> Optional[Cursor]:
202202
return self.retriever.cursor
203203
return None
204204

205+
def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
206+
"""
207+
Fetch a single record by primary key value.
208+
209+
Args:
210+
pk_value: The primary key value. Can be:
211+
- str: For simple single-field primary keys (e.g., "123")
212+
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
213+
214+
Returns:
215+
The fetched record as a dict, or None if not found
216+
217+
Raises:
218+
NotImplementedError: If the stream's retriever doesn't support fetching individual records
219+
"""
220+
if not isinstance(self.retriever, SimpleRetriever):
221+
raise NotImplementedError(
222+
f"Stream {self.name} does not support fetching individual records. "
223+
"Only streams with SimpleRetriever currently support this operation."
224+
)
225+
226+
return self.retriever.fetch_one(pk_value=pk_value, records_schema=self.get_json_schema())
227+
205228
def _get_checkpoint_reader(
206229
self,
207230
logger: logging.Logger,

airbyte_cdk/sources/abstract_source.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,34 @@ def stop_sync_on_stream_failure(self) -> bool:
324324
on the first error seen and emit a single error trace message for that stream.
325325
"""
326326
return False
327+
328+
def fetch_record(
329+
self, stream_name: str, pk_value: Any, config: Mapping[str, Any]
330+
) -> Optional[Mapping[str, Any]]:
331+
"""
332+
Fetch a single record from a stream by primary key.
333+
334+
Args:
335+
stream_name: Name of the stream to fetch from
336+
pk_value: Primary key value to fetch. Can be:
337+
- str: For simple single-field primary keys (e.g., "123")
338+
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
339+
config: Source configuration
340+
341+
Returns:
342+
The fetched record as a dict, or None if not found
343+
344+
Raises:
345+
ValueError: If the stream name is not found in the source
346+
NotImplementedError: If the stream doesn't support fetching individual records
347+
"""
348+
stream_instances = {s.name: s for s in self.streams(config)}
349+
stream = stream_instances.get(stream_name)
350+
351+
if not stream:
352+
raise ValueError(
353+
f"Stream '{stream_name}' not found in source. "
354+
f"Available streams: {', '.join(stream_instances.keys())}"
355+
)
356+
357+
return stream.fetch_record(pk_value)

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,123 @@ def _to_partition_key(to_serialize: Any) -> str:
626626
# separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
627627
return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
628628

629+
def fetch_one(
630+
self,
631+
pk_value: Union[str, Mapping[str, Any]],
632+
records_schema: Mapping[str, Any],
633+
) -> Optional[Mapping[str, Any]]:
634+
"""
635+
Fetch a single record by primary key value.
636+
637+
This method constructs a path by appending the primary key value to the base path
638+
and sends a GET request to fetch a single record. It's designed for REST APIs that
639+
follow the convention: GET /resource/{id}
640+
641+
Args:
642+
pk_value: The primary key value to fetch. Can be:
643+
- str: For simple single-field primary keys (e.g., "123")
644+
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
645+
records_schema: JSON schema describing the record structure
646+
647+
Returns:
648+
The fetched record as a dict, or None if not found (404 response)
649+
650+
Raises:
651+
Exception: For non-404 HTTP errors (propagated from requester's error handling)
652+
653+
Example:
654+
record = retriever.fetch_one("123", schema)
655+
656+
record = retriever.fetch_one({"company_id": "123", "property": "status"}, schema)
657+
658+
Note:
659+
This implementation uses convention-based path construction (Option B from design). (important-comment)
660+
For simple PKs: appends /{pk_value} to base path (important-comment)
661+
For composite PKs: appends /{value1}/{value2}/... in key order (important-comment)
662+
663+
Alternative approaches that could be implemented in the future: (important-comment)
664+
- Path template interpolation: Use a configurable template like "{base_path}/{id}" (important-comment)
665+
See: https://github.com/airbytehq/airbyte-python-cdk/issues/833#phase-1a (important-comment)
666+
- Field path configuration: Allow specifying which response field contains the record (important-comment)
667+
for APIs that wrap single records in envelopes like {"data": {...}} (important-comment)
668+
"""
669+
# Get the base path from the requester
670+
base_path = self.requester.get_path(
671+
stream_state={},
672+
stream_slice=StreamSlice(partition={}, cursor_slice={}),
673+
next_page_token=None,
674+
)
675+
676+
if isinstance(pk_value, str):
677+
fetch_path = f"{base_path}/{pk_value}".replace("//", "/")
678+
elif isinstance(pk_value, Mapping):
679+
sorted_values = [str(pk_value[key]) for key in sorted(pk_value.keys())]
680+
pk_path_segment = "/".join(sorted_values)
681+
fetch_path = f"{base_path}/{pk_path_segment}".replace("//", "/")
682+
else:
683+
raise ValueError(f"pk_value must be a string or dict, got {type(pk_value).__name__}")
684+
685+
stream_slice = StreamSlice(partition={}, cursor_slice={})
686+
687+
try:
688+
response = self.requester.send_request(
689+
path=fetch_path,
690+
stream_state={},
691+
stream_slice=stream_slice,
692+
next_page_token=None,
693+
request_headers=self._request_headers(
694+
stream_state={},
695+
stream_slice=stream_slice,
696+
next_page_token=None,
697+
),
698+
request_params=self._request_params(
699+
stream_state={},
700+
stream_slice=stream_slice,
701+
next_page_token=None,
702+
),
703+
request_body_data=self._request_body_data(
704+
stream_state={},
705+
stream_slice=stream_slice,
706+
next_page_token=None,
707+
),
708+
request_body_json=self._request_body_json(
709+
stream_state={},
710+
stream_slice=stream_slice,
711+
next_page_token=None,
712+
),
713+
log_formatter=self.log_formatter,
714+
)
715+
except Exception as e:
716+
# Check if this is a 404 (record not found) - return None
717+
if hasattr(e, "response") and hasattr(e.response, "status_code"):
718+
if e.response.status_code == 404:
719+
return None
720+
raise
721+
722+
if not response:
723+
return None
724+
725+
records = list(
726+
self._parse_response(
727+
response=response,
728+
stream_state={},
729+
records_schema=records_schema,
730+
stream_slice=stream_slice,
731+
next_page_token=None,
732+
)
733+
)
734+
735+
# Return the first record if found, None otherwise
736+
if records:
737+
first_record = records[0]
738+
if isinstance(first_record, Record):
739+
return dict(first_record.data)
740+
elif isinstance(first_record, Mapping):
741+
return dict(first_record)
742+
else:
743+
return None
744+
return None
745+
629746

630747
def _deep_merge(
631748
target: MutableMapping[str, Any], source: Union[Record, MutableMapping[str, Any]]

airbyte_cdk/sources/streams/core.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,26 @@ def get_cursor(self) -> Optional[Cursor]:
463463
"""
464464
return self.cursor
465465

466+
def fetch_record(self, pk_value: Any) -> Optional[Mapping[str, Any]]:
467+
"""
468+
Fetch a single record by primary key value.
469+
470+
Args:
471+
pk_value: The primary key value. Can be:
472+
- str: For simple single-field primary keys (e.g., "123")
473+
- Mapping[str, Any]: For composite primary keys (e.g., {"company_id": "123", "property": "status"})
474+
475+
Returns:
476+
The fetched record as a dict, or None if not found
477+
478+
Raises:
479+
NotImplementedError: If the stream doesn't support fetching individual records
480+
"""
481+
raise NotImplementedError(
482+
f"Stream {self.name} does not support fetching individual records. "
483+
"Only declarative streams with SimpleRetriever currently support this operation."
484+
)
485+
466486
def _get_checkpoint_reader(
467487
self,
468488
logger: logging.Logger,

0 commit comments

Comments
 (0)