Skip to content

Commit 90d4f1b

Browse files
committed
Merge remote-tracking branch 'origin/main' into aj/feat/cli/add-docker-mode-fast-test-option (manual merge resolution)
2 parents d631c34 + 5d9cfff commit 90d4f1b

File tree

15 files changed

+774
-56
lines changed

15 files changed

+774
-56
lines changed

airbyte_cdk/cli/airbyte_cdk/_secrets.py

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from rich.console import Console
4444
from rich.table import Table
4545

46+
from airbyte_cdk.cli.airbyte_cdk.exceptions import ConnectorSecretWithNoValidVersionsError
4647
from airbyte_cdk.utils.connector_paths import (
4748
resolve_connector_name,
4849
resolve_connector_name_and_directory,
@@ -131,24 +132,46 @@ def fetch(
131132
)
132133
# Fetch and write secrets
133134
secret_count = 0
135+
exceptions = []
136+
134137
for secret in secrets:
135138
secret_file_path = _get_secret_filepath(
136139
secrets_dir=secrets_dir,
137140
secret=secret,
138141
)
139-
_write_secret_file(
140-
secret=secret,
141-
client=client,
142-
file_path=secret_file_path,
142+
try:
143+
_write_secret_file(
144+
secret=secret,
145+
client=client,
146+
file_path=secret_file_path,
147+
connector_name=connector_name,
148+
gcp_project_id=gcp_project_id,
149+
)
150+
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
151+
secret_count += 1
152+
except ConnectorSecretWithNoValidVersionsError as e:
153+
exceptions.append(e)
154+
click.echo(
155+
f"Failed to retrieve secret '{e.secret_name}': No enabled version found", err=True
156+
)
157+
158+
if secret_count == 0 and not exceptions:
159+
click.echo(
160+
f"No secrets found for connector: '{connector_name}'",
161+
err=True,
143162
)
144-
click.echo(f"Secret written to: {secret_file_path.absolute()!s}", err=True)
145-
secret_count += 1
146163

147-
if secret_count == 0:
164+
if exceptions:
165+
error_message = f"Failed to retrieve {len(exceptions)} secret(s)"
148166
click.echo(
149-
f"No secrets found for connector: '{connector_name}'",
167+
style(
168+
error_message,
169+
fg="red",
170+
),
150171
err=True,
151172
)
173+
if secret_count == 0:
174+
raise exceptions[0]
152175

153176
if not print_ci_secrets_masks:
154177
return
@@ -230,9 +253,8 @@ def list_(
230253
table.add_column("Created", justify="left", style="blue", overflow="fold")
231254
for secret in secrets:
232255
full_secret_name = secret.name
233-
secret_name = full_secret_name.split("/secrets/")[-1] # Removes project prefix
234-
# E.g. https://console.cloud.google.com/security/secret-manager/secret/SECRET_SOURCE-SHOPIFY__CREDS/versions?hl=en&project=<gcp_project_id>
235-
secret_url = f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
256+
secret_name = _extract_secret_name(full_secret_name)
257+
secret_url = _get_secret_url(secret_name, gcp_project_id)
236258
table.add_row(
237259
f"[link={secret_url}]{secret_name}[/link]",
238260
"\n".join([f"{k}={v}" for k, v in secret.labels.items()]),
@@ -242,6 +264,43 @@ def list_(
242264
console.print(table)
243265

244266

267+
def _extract_secret_name(secret_name: str) -> str:
268+
"""Extract the secret name from a fully qualified secret path.
269+
270+
Handles different formats of secret names:
271+
- Full path: "projects/project-id/secrets/SECRET_NAME"
272+
- Already extracted: "SECRET_NAME"
273+
274+
Args:
275+
secret_name: The secret name or path
276+
277+
Returns:
278+
str: The extracted secret name without project prefix
279+
"""
280+
if "/secrets/" in secret_name:
281+
return secret_name.split("/secrets/")[-1]
282+
return secret_name
283+
284+
285+
def _get_secret_url(secret_name: str, gcp_project_id: str) -> str:
286+
"""Generate a URL for a secret in the GCP Secret Manager console.
287+
288+
Note: This URL itself does not contain secrets or sensitive information.
289+
The URL itself is only useful for valid logged-in users of the project, and it
290+
safe to print this URL in logs.
291+
292+
Args:
293+
secret_name: The name of the secret in GCP.
294+
gcp_project_id: The GCP project ID.
295+
296+
Returns:
297+
str: URL to the secret in the GCP console
298+
"""
299+
# Ensure we have just the secret name without the project prefix
300+
secret_name = _extract_secret_name(secret_name)
301+
return f"https://console.cloud.google.com/security/secret-manager/secret/{secret_name}/versions?hl=en&project={gcp_project_id}"
302+
303+
245304
def _fetch_secret_handles(
246305
connector_name: str,
247306
gcp_project_id: str = AIRBYTE_INTERNAL_GCP_PROJECT,
@@ -272,9 +331,44 @@ def _write_secret_file(
272331
secret: "Secret", # type: ignore
273332
client: "secretmanager.SecretManagerServiceClient", # type: ignore
274333
file_path: Path,
334+
connector_name: str,
335+
gcp_project_id: str,
275336
) -> None:
276-
version_name = f"{secret.name}/versions/latest"
277-
response = client.access_secret_version(name=version_name)
337+
"""Write the most recent enabled version of a secret to a file.
338+
339+
Lists all enabled versions of the secret and selects the most recent one.
340+
Raises ConnectorSecretWithNoValidVersionsError if no enabled versions are found.
341+
342+
Args:
343+
secret: The secret to write to a file
344+
client: The Secret Manager client
345+
file_path: The path to write the secret to
346+
connector_name: The name of the connector
347+
gcp_project_id: The GCP project ID
348+
349+
Raises:
350+
ConnectorSecretWithNoValidVersionsError: If no enabled version is found
351+
"""
352+
# List all enabled versions of the secret.
353+
response = client.list_secret_versions(
354+
request={"parent": secret.name, "filter": "state:ENABLED"}
355+
)
356+
357+
# The API returns versions pre-sorted in descending order, with the
358+
# 0th item being the latest version.
359+
versions = list(response)
360+
361+
if not versions:
362+
secret_name = _extract_secret_name(secret.name)
363+
raise ConnectorSecretWithNoValidVersionsError(
364+
connector_name=connector_name,
365+
secret_name=secret_name,
366+
gcp_project_id=gcp_project_id,
367+
)
368+
369+
enabled_version = versions[0]
370+
371+
response = client.access_secret_version(name=enabled_version.name)
278372
file_path.write_text(response.payload.data.decode("UTF-8"))
279373
file_path.chmod(0o600) # default to owner read/write only
280374

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
"""Exceptions for the Airbyte CDK CLI."""
3+
4+
from dataclasses import dataclass
5+
6+
7+
@dataclass(kw_only=True)
8+
class ConnectorSecretWithNoValidVersionsError(Exception):
9+
"""Error when a connector secret has no valid versions."""
10+
11+
connector_name: str
12+
secret_name: str
13+
gcp_project_id: str
14+
15+
def __str__(self) -> str:
16+
"""Return a string representation of the exception."""
17+
from airbyte_cdk.cli.airbyte_cdk._secrets import _get_secret_url
18+
19+
url = _get_secret_url(self.secret_name, self.gcp_project_id)
20+
return (
21+
f"No valid versions found for secret '{self.secret_name}' in connector '{self.connector_name}'. "
22+
f"Please check the following URL for more information:\n- {url}"
23+
)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,12 +1421,22 @@ definitions:
14211421
default: ""
14221422
schema_loader:
14231423
title: Schema Loader
1424-
description: Component used to retrieve the schema for the current stream.
1424+
description:
1425+
One or many schema loaders can be used to retrieve the schema for the current stream. When
1426+
multiple schema loaders are defined, schema properties will be merged together. Schema
1427+
loaders defined first taking precedence in the event of a conflict.
14251428
anyOf:
14261429
- "$ref": "#/definitions/InlineSchemaLoader"
14271430
- "$ref": "#/definitions/DynamicSchemaLoader"
14281431
- "$ref": "#/definitions/JsonFileSchemaLoader"
14291432
- "$ref": "#/definitions/CustomSchemaLoader"
1433+
- type: array
1434+
items:
1435+
anyOf:
1436+
- "$ref": "#/definitions/InlineSchemaLoader"
1437+
- "$ref": "#/definitions/DynamicSchemaLoader"
1438+
- "$ref": "#/definitions/JsonFileSchemaLoader"
1439+
- "$ref": "#/definitions/CustomSchemaLoader"
14301440
# TODO we have move the transformation to the RecordSelector level in the code but kept this here for
14311441
# compatibility reason. We should eventually move this to align with the code.
14321442
transformations:
@@ -4362,4 +4372,4 @@ interpolation:
43624372
regex: The regular expression to search for. It must include a capture group.
43634373
return_type: str
43644374
examples:
4365-
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
4375+
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2151,6 +2153,14 @@ class Config:
21512153
DynamicSchemaLoader,
21522154
JsonFileSchemaLoader,
21532155
CustomSchemaLoader,
2156+
List[
2157+
Union[
2158+
InlineSchemaLoader,
2159+
DynamicSchemaLoader,
2160+
JsonFileSchemaLoader,
2161+
CustomSchemaLoader,
2162+
]
2163+
],
21542164
]
21552165
] = Field(
21562166
None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@
504504
SchemaTypeIdentifier,
505505
TypesMap,
506506
)
507+
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
507508
from airbyte_cdk.sources.declarative.spec import Spec
508509
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
509510
from airbyte_cdk.sources.declarative.transformations import (
@@ -1914,9 +1915,25 @@ def create_declarative_stream(
19141915
else:
19151916
state_transformations = []
19161917

1917-
if model.schema_loader:
1918+
schema_loader: Union[
1919+
CompositeSchemaLoader,
1920+
DefaultSchemaLoader,
1921+
DynamicSchemaLoader,
1922+
InlineSchemaLoader,
1923+
JsonFileSchemaLoader,
1924+
]
1925+
if model.schema_loader and isinstance(model.schema_loader, list):
1926+
nested_schema_loaders = [
1927+
self._create_component_from_model(model=nested_schema_loader, config=config)
1928+
for nested_schema_loader in model.schema_loader
1929+
]
1930+
schema_loader = CompositeSchemaLoader(
1931+
schema_loaders=nested_schema_loaders, parameters={}
1932+
)
1933+
elif model.schema_loader:
19181934
schema_loader = self._create_component_from_model(
1919-
model=model.schema_loader, config=config
1935+
model=model.schema_loader, # type: ignore # If defined, schema_loader is guaranteed not to be a list and will be one of the existing base models
1936+
config=config,
19201937
)
19211938
else:
19221939
options = model.parameters or {}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Dict, List, Mapping
7+
8+
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
9+
10+
11+
@dataclass
12+
class CompositeSchemaLoader(SchemaLoader):
13+
"""
14+
Schema loader that consists of multiple schema loaders that are combined into a single
15+
schema. Subsequent schemas do not overwrite existing values so the schema loaders with
16+
a higher priority should be defined first.
17+
"""
18+
19+
schema_loaders: List[SchemaLoader]
20+
parameters: InitVar[Mapping[str, Any]]
21+
22+
def get_json_schema(self) -> Mapping[str, Any]:
23+
combined_schema: Dict[str, Any] = {
24+
"$schema": "http://json-schema.org/draft-07/schema#",
25+
"type": ["null", "object"],
26+
"properties": {},
27+
}
28+
for schema_loader in self.schema_loaders:
29+
schema_properties = schema_loader.get_json_schema()["properties"]
30+
combined_schema["properties"] = {**schema_properties, **combined_schema["properties"]}
31+
return combined_schema

airbyte_cdk/test/entrypoint_wrapper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]:
8282
def state_messages(self) -> List[AirbyteMessage]:
8383
return self._get_message_by_types([Type.STATE])
8484

85+
@property
86+
def spec_messages(self) -> List[AirbyteMessage]:
87+
return self._get_message_by_types([Type.SPEC])
88+
8589
@property
8690
def connection_status_messages(self) -> List[AirbyteMessage]:
8791
return self._get_message_by_types([Type.CONNECTION_STATUS])

airbyte_cdk/test/standard_tests/_job_runner.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,15 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
6161

6262
def run_test_job(
6363
connector: IConnector | type[IConnector] | Callable[[], IConnector],
64-
verb: Literal["read", "check", "discover"],
65-
test_scenario: ConnectorTestScenario,
64+
verb: Literal["spec", "read", "check", "discover"],
6665
*,
66+
test_scenario: ConnectorTestScenario | None = None,
6767
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
6868
) -> entrypoint_wrapper.EntrypointOutput:
6969
"""Run a test scenario from provided CLI args and return the result."""
70+
# Use default (empty) scenario if not provided:
71+
test_scenario = test_scenario or ConnectorTestScenario()
72+
7073
if not connector:
7174
raise ValueError("Connector is required")
7275

@@ -86,14 +89,14 @@ def run_test_job(
8689
)
8790

8891
args: list[str] = [verb]
89-
if test_scenario.config_path:
90-
args += ["--config", str(test_scenario.config_path)]
91-
elif test_scenario.config_dict:
92+
config_dict = test_scenario.get_config_dict(empty_if_missing=True)
93+
if config_dict and verb != "spec":
94+
# Write the config to a temp json file and pass the path to the file as an argument.
9295
config_path = (
9396
Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json"
9497
)
9598
config_path.parent.mkdir(parents=True, exist_ok=True)
96-
config_path.write_text(orjson.dumps(test_scenario.config_dict).decode())
99+
config_path.write_text(orjson.dumps(config_dict).decode())
97100
args += ["--config", str(config_path)]
98101

99102
catalog_path: Path | None = None

0 commit comments

Comments
 (0)