Skip to content

Commit e36067a

Browse files
committed
Merge branch 'main' into pnilan/feat/extend-spec-class-for-config-migrations
2 parents ff10aa3 + dc10839 commit e36067a

File tree

13 files changed

+817
-184
lines changed

13 files changed

+817
-184
lines changed

airbyte_cdk/cli/airbyte_cdk/_secrets.py

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from rich.console import Console
4444
from rich.table import Table
4545

46+
from airbyte_cdk.cli.airbyte_cdk.exceptions import ConnectorSecretWithNoValidVersionsError
4647
from airbyte_cdk.utils.connector_paths import (
4748
resolve_connector_name,
4849
resolve_connector_name_and_directory,
@@ -131,24 +132,46 @@ def fetch(
131132
)
132133
# Fetch and write secrets
133134
secret_count = 0
135+
exceptions = []
136+
134137
for secret in secrets:
135138
secret_file_path = _get_secret_filepath(
136139
secrets_dir=secrets_dir,
137140
secret=secret,
138141
)
139-
_write_secret_file(
140-
secret=secret,
141-
client=client,
142-
file_path=secret_file_path,
142+
try:
143+
_write_secret_file(
144+
secret=secret,
145+
client=client,
146+
file_path=secret_file_path,
147+
connector_name=connector_name,
148+
gcp_project_id=gcp_project_id,
149+
)
150+
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
151+
secret_count += 1
152+
except ConnectorSecretWithNoValidVersionsError as e:
153+
exceptions.append(e)
154+
click.echo(
155+
f"Failed to retrieve secret '{e.secret_name}': No enabled version found", err=True
156+
)
157+
158+
if secret_count == 0 and not exceptions:
159+
click.echo(
160+
f"No secrets found for connector: '{connector_name}'",
161+
err=True,
143162
)
144-
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
145-
secret_count += 1
146163

147-
if secret_count == 0:
164+
if exceptions:
165+
error_message = f"Failed to retrieve {len(exceptions)} secret(s)"
148166
click.echo(
149-
f"No secrets found for connector: '{connector_name}'",
167+
style(
168+
error_message,
169+
fg="red",
170+
),
150171
err=True,
151172
)
173+
if secret_count == 0:
174+
raise exceptions[0]
152175

153176
if not print_ci_secrets_masks:
154177
return
@@ -230,9 +253,8 @@ def list_(
230253
table.add_column("Created", justify="left", style="blue", overflow="fold")
231254
for secret in secrets:
232255
full_secret_name = secret.name
233-
secret_name = full_secret_name.split("/secrets/")[-1] # Removes project prefix
234-
# E.g. https://console.cloud.google.com/security/secret-manager/secret/SECRET_SOURCE-SHOPIFY__CREDS/versions?hl=en&project=<gcp_project_id>
235-
secret_url = f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
256+
secret_name = _extract_secret_name(full_secret_name)
257+
secret_url = _get_secret_url(secret_name, gcp_project_id)
236258
table.add_row(
237259
f"[link={secret_url}]{secret_name}[/link]",
238260
"\n".join([f"{k}={v}" for k, v in secret.labels.items()]),
@@ -242,6 +264,43 @@ def list_(
242264
console.print(table)
243265

244266

267+
def _extract_secret_name(secret_name: str) -> str:
268+
"""Extract the secret name from a fully qualified secret path.
269+
270+
Handles different formats of secret names:
271+
- Full path: "projects/project-id/secrets/SECRET_NAME"
272+
- Already extracted: "SECRET_NAME"
273+
274+
Args:
275+
secret_name: The secret name or path
276+
277+
Returns:
278+
str: The extracted secret name without project prefix
279+
"""
280+
if "/secrets/" in secret_name:
281+
return secret_name.split("/secrets/")[-1]
282+
return secret_name
283+
284+
285+
def _get_secret_url(secret_name: str, gcp_project_id: str) -> str:
286+
"""Generate a URL for a secret in the GCP Secret Manager console.
287+
288+
Note: This URL itself does not contain secrets or sensitive information.
289+
The URL itself is only useful for valid logged-in users of the project, and it
290+
safe to print this URL in logs.
291+
292+
Args:
293+
secret_name: The name of the secret in GCP.
294+
gcp_project_id: The GCP project ID.
295+
296+
Returns:
297+
str: URL to the secret in the GCP console
298+
"""
299+
# Ensure we have just the secret name without the project prefix
300+
secret_name = _extract_secret_name(secret_name)
301+
return f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
302+
303+
245304
def _fetch_secret_handles(
246305
connector_name: str,
247306
gcp_project_id: str = AIRBYTE_INTERNAL_GCP_PROJECT,
@@ -272,9 +331,44 @@ def _write_secret_file(
272331
secret: "Secret", # type: ignore
273332
client: "secretmanager.SecretManagerServiceClient", # type: ignore
274333
file_path: Path,
334+
connector_name: str,
335+
gcp_project_id: str,
275336
) -> None:
276-
version_name = f"{secret.name}/versions/latest"
277-
response = client.access_secret_version(name=version_name)
337+
"""Write the most recent enabled version of a secret to a file.
338+
339+
Lists all enabled versions of the secret and selects the most recent one.
340+
Raises ConnectorSecretWithNoValidVersionsError if no enabled versions are found.
341+
342+
Args:
343+
secret: The secret to write to a file
344+
client: The Secret Manager client
345+
file_path: The path to write the secret to
346+
connector_name: The name of the connector
347+
gcp_project_id: The GCP project ID
348+
349+
Raises:
350+
ConnectorSecretWithNoValidVersionsError: If no enabled version is found
351+
"""
352+
# List all enabled versions of the secret.
353+
response = client.list_secret_versions(
354+
request={"parent": secret.name, "filter": "state:ENABLED"}
355+
)
356+
357+
# The API returns versions pre-sorted in descending order, with the
358+
# 0th item being the latest version.
359+
versions = list(response)
360+
361+
if not versions:
362+
secret_name = _extract_secret_name(secret.name)
363+
raise ConnectorSecretWithNoValidVersionsError(
364+
connector_name=connector_name,
365+
secret_name=secret_name,
366+
gcp_project_id=gcp_project_id,
367+
)
368+
369+
enabled_version = versions[0]
370+
371+
response = client.access_secret_version(name=enabled_version.name)
278372
file_path.write_text(response.payload.data.decode("UTF-8"))
279373
file_path.chmod(0o600) # default to owner read/write only
280374

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
"""Exceptions for the Airbyte CDK CLI."""
3+
4+
from dataclasses import dataclass
5+
6+
7+
@dataclass(kw_only=True)
8+
class ConnectorSecretWithNoValidVersionsError(Exception):
9+
"""Error when a connector secret has no valid versions."""
10+
11+
connector_name: str
12+
secret_name: str
13+
gcp_project_id: str
14+
15+
def __str__(self) -> str:
16+
"""Return a string representation of the exception."""
17+
from airbyte_cdk.cli.airbyte_cdk._secrets import _get_secret_url
18+
19+
url = _get_secret_url(self.secret_name, self.gcp_project_id)
20+
return (
21+
f"No valid versions found for secret '{self.secret_name}' in connector '{self.connector_name}'. "
22+
f"Please check the following URL for more information:\n- {url}"
23+
)

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ def _emit_state_message(self, throttle: bool = True) -> None:
242242
if current_time is None:
243243
return
244244
self._last_emission_time = current_time
245+
# Skip state emit for global cursor if parent state is empty
246+
if self._use_global_cursor and not self._parent_state:
247+
return
248+
245249
self._connector_state_manager.update_state_for_stream(
246250
self._stream_name,
247251
self._stream_namespace,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from isodate import parse_duration
2929
from pydantic.v1 import BaseModel
30+
from requests import Response
3031

3132
from airbyte_cdk.connector_builder.models import (
3233
LogMessage as ConnectorBuilderLogMessage,
@@ -562,6 +563,7 @@
562563
PredicateValidator,
563564
ValidateAdheresToSchema,
564565
)
566+
from airbyte_cdk.sources.http_logger import format_http_message
565567
from airbyte_cdk.sources.message import (
566568
InMemoryMessageRepository,
567569
LogAppenderMessageRepositoryDecorator,
@@ -1582,7 +1584,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
15821584
)
15831585
)
15841586
stream_state = self.apply_stream_state_migrations(stream_state_migrations, stream_state)
1585-
15861587
# Per-partition state doesn't make sense for GroupingPartitionRouter, so force the global state
15871588
use_global_cursor = isinstance(
15881589
partition_router, GroupingPartitionRouter
@@ -2490,15 +2491,24 @@ def create_dynamic_schema_loader(
24902491
schema_transformations.append(
24912492
self._create_component_from_model(model=transformation_model, config=config)
24922493
)
2493-
2494+
name = "dynamic_properties"
24942495
retriever = self._create_component_from_model(
24952496
model=model.retriever,
24962497
config=config,
2497-
name="dynamic_properties",
2498+
name=name,
24982499
primary_key=None,
24992500
stream_slicer=combined_slicers,
25002501
transformations=[],
25012502
use_cache=True,
2503+
log_formatter=(
2504+
lambda response: format_http_message(
2505+
response,
2506+
f"Schema loader '{name}' request",
2507+
f"Request performed in order to extract schema.",
2508+
name,
2509+
is_auxiliary=True,
2510+
)
2511+
),
25022512
)
25032513
schema_type_identifier = self._create_component_from_model(
25042514
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -3085,6 +3095,7 @@ def create_simple_retriever(
30853095
]
30863096
] = None,
30873097
use_cache: Optional[bool] = None,
3098+
log_formatter: Optional[Callable[[Response], Any]] = None,
30883099
**kwargs: Any,
30893100
) -> SimpleRetriever:
30903101
def _get_url() -> str:
@@ -3261,6 +3272,7 @@ def _get_url() -> str:
32613272
config=config,
32623273
maximum_number_of_slices=self._limit_slices_fetched or 5,
32633274
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
3275+
log_formatter=log_formatter,
32643276
parameters=model.parameters or {},
32653277
)
32663278
return SimpleRetriever(

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,14 @@ def _get_polling_response_interpolation_context(self, job: AsyncJob) -> Dict[str
320320
return polling_response_context
321321

322322
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
323-
stream_slice = StreamSlice(
324-
partition={},
325-
cursor_slice={},
326-
extra_fields={
323+
return StreamSlice(
324+
partition=job.job_parameters().partition,
325+
cursor_slice=job.job_parameters().cursor_slice,
326+
extra_fields=dict(job.job_parameters().extra_fields)
327+
| {
327328
"creation_response": self._get_creation_response_interpolation_context(job),
328329
},
329330
)
330-
return stream_slice
331331

332332
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
333333
if not self.download_target_requester:

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_nested_request_input_provider.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
)
1212
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1313
from airbyte_cdk.sources.types import Config, StreamSlice
14+
from airbyte_cdk.utils.mapping_helpers import get_interpolation_context
1415

1516

1617
@dataclass
@@ -52,8 +53,8 @@ def eval_request_inputs(
5253
:param next_page_token: The pagination token
5354
:return: The request inputs to set on an outgoing HTTP request
5455
"""
55-
kwargs = {
56-
"stream_slice": stream_slice,
57-
"next_page_token": next_page_token,
58-
}
56+
kwargs = get_interpolation_context(
57+
stream_slice=stream_slice,
58+
next_page_token=next_page_token,
59+
)
5960
return self._interpolator.eval(self.config, **kwargs) # type: ignore # self._interpolator is always initialized with a value and will not be None

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_input_provider.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
99
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1010
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
11+
from airbyte_cdk.utils.mapping_helpers import get_interpolation_context
1112

1213

1314
@dataclass
@@ -51,10 +52,10 @@ def eval_request_inputs(
5152
:param valid_value_types: A tuple of types that the interpolator should allow
5253
:return: The request inputs to set on an outgoing HTTP request
5354
"""
54-
kwargs = {
55-
"stream_slice": stream_slice,
56-
"next_page_token": next_page_token,
57-
}
55+
kwargs = get_interpolation_context(
56+
stream_slice=stream_slice,
57+
next_page_token=next_page_token,
58+
)
5859
interpolated_value = self._interpolator.eval( # type: ignore # self._interpolator is always initialized with a value and will not be None
5960
self.config,
6061
valid_key_types=valid_key_types,

0 commit comments

Comments
 (0)