Skip to content

Commit c88aeed

Browse files
docs: Add Google-style docstrings to core Python CDK modules
- Update AbstractSource class and methods with concise docstrings - Update HttpStream class and abstract methods with clear documentation - Update YamlDeclarativeSource with initialization parameter docs - Convert old :param: style to Google-style Args/Returns format - Focus on public-facing APIs used by connector developers Co-Authored-By: AJ Steers <[email protected]>
1 parent 80b7668 commit c88aeed

File tree

3 files changed

+56
-63
lines changed

3 files changed

+56
-63
lines changed

airbyte_cdk/sources/abstract_source.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,49 +50,42 @@
5050

5151

5252
class AbstractSource(Source, ABC):
53-
"""
54-
Abstract base class for an Airbyte Source. Consumers should implement any abstract methods
55-
in this class to create an Airbyte Specification compliant Source.
56-
"""
53+
"""Base class for Airbyte source connectors that orchestrates stream reading and state management."""
5754

5855
@abstractmethod
5956
def check_connection(
6057
self, logger: logging.Logger, config: Mapping[str, Any]
6158
) -> Tuple[bool, Optional[Any]]:
62-
"""
63-
:param logger: source logger
64-
:param config: The user-provided configuration as specified by the source's spec.
65-
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
66-
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
67-
and we can connect to the underlying data source using the provided configuration.
68-
Otherwise, the input config cannot be used to connect to the underlying data source,
69-
and the "error" object should describe what went wrong.
70-
The error object will be cast to string to display the problem to the user.
59+
"""Validates that the provided configuration can successfully connect to the data source.
60+
61+
Args:
62+
logger: Source logger for diagnostic output.
63+
config: User-provided configuration containing credentials and connection parameters.
64+
65+
Returns:
66+
Tuple of (success boolean, error object). If success is True, connection is valid.
67+
If False, error object describes what went wrong and will be displayed to the user.
7168
"""
7269

7370
@abstractmethod
7471
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
75-
"""
76-
:param config: The user-provided configuration as specified by the source's spec.
77-
Any stream construction related operation should happen here.
78-
:return: A list of the streams in this source connector.
72+
"""Returns the list of streams available in this source connector.
73+
74+
Args:
75+
config: User-provided configuration for initializing streams.
7976
"""
8077

8178
# Stream name to instance map for applying output object transformation
8279
_stream_to_instance_map: Dict[str, Stream] = {}
8380
_slice_logger: SliceLogger = DebugSliceLogger()
8481

8582
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
86-
"""Implements the Discover operation from the Airbyte Specification.
87-
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#discover.
88-
"""
83+
"""Discovers available streams and their schemas from the data source."""
8984
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
9085
return AirbyteCatalog(streams=streams)
9186

9287
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
93-
"""Implements the Check Connection operation from the Airbyte Specification.
94-
See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/#check.
95-
"""
88+
"""Validates connection to the data source using the provided configuration."""
9689
check_succeeded, error = self.check_connection(logger, config)
9790
if not check_succeeded:
9891
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
@@ -105,7 +98,7 @@ def read(
10598
catalog: ConfiguredAirbyteCatalog,
10699
state: Optional[List[AirbyteStateMessage]] = None,
107100
) -> Iterator[AirbyteMessage]:
108-
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.com/understanding-airbyte/airbyte-protocol/."""
101+
"""Reads records from configured streams and emits them as Airbyte messages."""
109102
logger.info(f"Starting syncing {self.name}")
110103
config, internal_config = split_config(config)
111104
# TODO assert all streams exist in the connector
@@ -214,6 +207,7 @@ def read(
214207
def _serialize_exception(
215208
stream_descriptor: StreamDescriptor, e: Exception, stream_instance: Optional[Stream] = None
216209
) -> AirbyteTracedException:
210+
"""Converts an exception into an AirbyteTracedException with optional stream-specific error message."""
217211
display_message = stream_instance.get_error_display_message(e) if stream_instance else None
218212
if display_message:
219213
return AirbyteTracedException.from_exception(
@@ -223,6 +217,7 @@ def _serialize_exception(
223217

224218
@property
225219
def raise_exception_on_missing_stream(self) -> bool:
220+
"""Controls whether to raise an exception when a configured stream is not found in the source."""
226221
return False
227222

228223
def _read_stream(
@@ -233,6 +228,7 @@ def _read_stream(
233228
state_manager: ConnectorStateManager,
234229
internal_config: InternalConfig,
235230
) -> Iterator[AirbyteMessage]:
231+
"""Reads records from a single stream and emits them as Airbyte messages."""
236232
if internal_config.page_size and isinstance(stream_instance, HttpStream):
237233
logger.info(
238234
f"Setting page size for {stream_instance.name} to {internal_config.page_size}"
@@ -289,16 +285,15 @@ def _read_stream(
289285
logger.info(f"Read {record_counter} records from {stream_name} stream")
290286

291287
def _emit_queued_messages(self) -> Iterable[AirbyteMessage]:
288+
"""Emits any messages that have been queued in the message repository."""
292289
if self.message_repository:
293290
yield from self.message_repository.consume_queue()
294291
return
295292

296293
def _get_message(
297294
self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream
298295
) -> AirbyteMessage:
299-
"""
300-
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
301-
"""
296+
"""Converts StreamData to AirbyteMessage or returns the input if already an AirbyteMessage."""
302297
match record_data_or_message:
303298
case AirbyteMessage():
304299
return record_data_or_message
@@ -312,11 +307,13 @@ def _get_message(
312307

313308
@property
314309
def message_repository(self) -> Union[None, MessageRepository]:
310+
"""Returns the message repository used for queuing messages during sync operations."""
315311
return _default_message_repository
316312

317313
@property
318314
def stop_sync_on_stream_failure(self) -> bool:
319-
"""
315+
"""Controls whether to stop the entire sync when a single stream fails.
316+
320317
WARNING: This function is in-development which means it is subject to change. Use at your own risk.
321318
322319
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then

airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616

1717
class YamlDeclarativeSource(ConcurrentDeclarativeSource):
18-
"""Declarative source defined by a yaml file"""
18+
"""Entry point for declarative YAML-based source connectors that loads and executes manifest files."""
1919

2020
def __init__(
2121
self,
@@ -26,8 +26,15 @@ def __init__(
2626
state: Optional[List[AirbyteStateMessage]] = None,
2727
config_path: Optional[str] = None,
2828
) -> None:
29-
"""
30-
:param path_to_yaml: Path to the yaml file describing the source
29+
"""Initializes a declarative source from a YAML manifest file.
30+
31+
Args:
32+
path_to_yaml: Path to the manifest YAML file describing the source.
33+
debug: Enable debug logging for manifest parsing and execution.
34+
catalog: Configured catalog for the sync operation.
35+
config: User-provided configuration for the source.
36+
state: Current state for incremental syncs.
37+
config_path: Path to the configuration file.
3138
"""
3239
self._path_to_yaml = path_to_yaml
3340
source_config = self._read_and_parse_yaml_file(path_to_yaml)

airbyte_cdk/sources/streams/http/http.py

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@
4242

4343

4444
class HttpStream(Stream, CheckpointMixin, ABC):
45-
"""
46-
Base abstract class for an Airbyte Stream using the HTTP protocol. Basic building block for users building an Airbyte source for a HTTP API.
47-
"""
45+
"""Base class for streams that fetch data from HTTP APIs with built-in pagination and error handling."""
4846

4947
source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
5048
page_size: Optional[int] = (
@@ -108,15 +106,11 @@ def use_cache(self) -> bool:
108106
@property
109107
@abstractmethod
110108
def url_base(self) -> str:
111-
"""
112-
:return: URL base for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "https://myapi.com/v1/"
113-
"""
109+
"""Returns the base URL for API requests (e.g., "https://api.example.com/v1/")."""
114110

115111
@property
116112
def http_method(self) -> str:
117-
"""
118-
Override if needed. See get_request_data/get_request_json if using POST/PUT/PATCH.
119-
"""
113+
"""Returns the HTTP method to use for requests (default: "GET")."""
120114
return "GET"
121115

122116
@property
@@ -165,12 +159,13 @@ def retry_factor(self) -> float:
165159

166160
@abstractmethod
167161
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
168-
"""
169-
Override this method to define a pagination strategy.
162+
"""Returns the token for the next page of results, or None if no more pages exist.
170163
171-
The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
164+
Args:
165+
response: HTTP response from the current page request.
172166
173-
:return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
167+
Returns:
168+
Mapping containing pagination token, or None if pagination is complete.
174169
"""
175170

176171
@abstractmethod
@@ -181,21 +176,15 @@ def path(
181176
stream_slice: Optional[Mapping[str, Any]] = None,
182177
next_page_token: Optional[Mapping[str, Any]] = None,
183178
) -> str:
184-
"""
185-
Returns the URL path for the API endpoint e.g: if you wanted to hit https://myapi.com/v1/some_entity then this should return "some_entity"
186-
"""
179+
"""Returns the URL path for the API endpoint (e.g., "users" or "v2/customers")."""
187180

188181
def request_params(
189182
self,
190183
stream_state: Optional[Mapping[str, Any]],
191184
stream_slice: Optional[Mapping[str, Any]] = None,
192185
next_page_token: Optional[Mapping[str, Any]] = None,
193186
) -> MutableMapping[str, Any]:
194-
"""
195-
Override this method to define the query parameters that should be set on an outgoing HTTP request given the inputs.
196-
197-
E.g: you might want to define query parameters for paging if next_page_token is not None.
198-
"""
187+
"""Returns query parameters to include in the HTTP request."""
199188
return {}
200189

201190
def request_headers(
@@ -204,9 +193,7 @@ def request_headers(
204193
stream_slice: Optional[Mapping[str, Any]] = None,
205194
next_page_token: Optional[Mapping[str, Any]] = None,
206195
) -> Mapping[str, Any]:
207-
"""
208-
Override to return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
209-
"""
196+
"""Returns non-authentication headers to include in the HTTP request."""
210197
return {}
211198

212199
def request_body_data(
@@ -261,14 +248,16 @@ def parse_response(
261248
stream_slice: Optional[Mapping[str, Any]] = None,
262249
next_page_token: Optional[Mapping[str, Any]] = None,
263250
) -> Iterable[Mapping[str, Any]]:
264-
"""
265-
Parses the raw response object into a list of records.
266-
By default, this returns an iterable containing the input. Override to parse differently.
267-
:param response:
268-
:param stream_state:
269-
:param stream_slice:
270-
:param next_page_token:
271-
:return: An iterable containing the parsed response
251+
"""Parses the HTTP response into an iterable of record dictionaries.
252+
253+
Args:
254+
response: HTTP response object from the API request.
255+
stream_state: Current state for incremental syncs.
256+
stream_slice: Current partition being processed.
257+
next_page_token: Token for the current page of results.
258+
259+
Returns:
260+
Iterable of record dictionaries extracted from the response.
272261
"""
273262

274263
def get_backoff_strategy(self) -> Optional[Union[BackoffStrategy, List[BackoffStrategy]]]:

0 commit comments

Comments
 (0)