Skip to content

Commit e3d3828

Browse files
author
maxi297
committed
Merge branch 'main' into maxi297/filebased-fix-column-named-type
2 parents b6129a3 + 450a845 commit e3d3828

20 files changed

+524
-141
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: Dependency Analysis
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
paths:
8+
- "airbyte_cdk/**"
9+
- "poetry.lock"
10+
- "pyproject.toml"
11+
pull_request:
12+
paths:
13+
- "airbyte_cdk/**"
14+
- "poetry.lock"
15+
- "pyproject.toml"
16+
17+
jobs:
18+
dependency-analysis:
19+
name: Dependency Analysis with Deptry
20+
runs-on: ubuntu-24.04
21+
steps:
22+
- name: Checkout code
23+
uses: actions/checkout@v4
24+
- name: Set up Python
25+
uses: actions/setup-python@v5
26+
with:
27+
python-version: '3.10'
28+
- name: Set up Poetry
29+
uses: Gr1N/setup-poetry@v9
30+
with:
31+
poetry-version: "2.0.1"
32+
- name: Install dependencies
33+
run: poetry install --all-extras
34+
35+
# Job-specific step(s):
36+
- name: Run Deptry
37+
run: |
38+
poetry run deptry .

airbyte_cdk/sources/declarative/async_job/job_tracker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ class ConcurrentJobLimitReached(Exception):
1717
class JobTracker:
1818
def __init__(self, limit: int):
1919
self._jobs: Set[str] = set()
20-
self._limit = limit
20+
if limit < 1:
21+
LOGGER.warning(
22+
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
23+
)
24+
self._limit = 1 if limit < 1 else limit
2125
self._lock = threading.Lock()
2226

2327
def try_to_get_intent(self) -> str:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ properties:
4242
"$ref": "#/definitions/ConcurrencyLevel"
4343
api_budget:
4444
"$ref": "#/definitions/HTTPAPIBudget"
45+
max_concurrent_async_job_count:
46+
title: Maximum Concurrent Asynchronous Jobs
47+
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
48+
type: integer
4549
metadata:
4650
type: object
4751
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
@@ -109,7 +113,7 @@ definitions:
109113
description: List of transformations (path and corresponding value) that will be added to the record.
110114
type: array
111115
items:
112-
- "$ref": "#/definitions/AddedFieldDefinition"
116+
"$ref": "#/definitions/AddedFieldDefinition"
113117
$parameters:
114118
type: object
115119
additionalProperties: true
@@ -1657,7 +1661,7 @@ definitions:
16571661
description: List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).
16581662
type: array
16591663
items:
1660-
- type: string
1664+
type: string
16611665
interpolation_context:
16621666
- config
16631667
examples:
@@ -2073,29 +2077,29 @@ definitions:
20732077
type: array
20742078
default: []
20752079
items:
2076-
- type: string
2080+
type: string
20772081
interpolation_context:
20782082
- config
20792083
key_pointer:
20802084
title: Key Path
20812085
description: List of potentially nested fields describing the full path of the field key to extract.
20822086
type: array
20832087
items:
2084-
- type: string
2088+
type: string
20852089
interpolation_context:
20862090
- config
20872091
type_pointer:
20882092
title: Type Path
20892093
description: List of potentially nested fields describing the full path of the field type to extract.
20902094
type: array
20912095
items:
2092-
- type: string
2096+
type: string
20932097
interpolation_context:
20942098
- config
20952099
types_mapping:
20962100
type: array
20972101
items:
2098-
- "$ref": "#/definitions/TypesMap"
2102+
"$ref": "#/definitions/TypesMap"
20992103
$parameters:
21002104
type: object
21012105
additionalProperties: true
@@ -2251,7 +2255,7 @@ definitions:
22512255
description: A path to field that needs to be flattened.
22522256
type: array
22532257
items:
2254-
- type: string
2258+
type: string
22552259
examples:
22562260
- ["data"]
22572261
- ["data", "*", "field"]
@@ -3526,7 +3530,7 @@ definitions:
35263530
description: A list of potentially nested fields indicating the full path where value will be added or updated.
35273531
type: array
35283532
items:
3529-
- type: string
3533+
type: string
35303534
interpolation_context:
35313535
- config
35323536
- components_values
@@ -3602,7 +3606,7 @@ definitions:
36023606
description: A list of potentially nested fields indicating the full path in source config file where streams configs located.
36033607
type: array
36043608
items:
3605-
- type: string
3609+
type: string
36063610
interpolation_context:
36073611
- parameters
36083612
examples:

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ def __init__(
9393
self._constructor = (
9494
component_factory
9595
if component_factory
96-
else ModelToComponentFactory(emit_connector_builder_messages)
96+
else ModelToComponentFactory(
97+
emit_connector_builder_messages,
98+
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
99+
)
97100
)
98101
self._message_repository = self._constructor.get_message_repository()
99102
self._slice_logger: SliceLogger = (

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1871,6 +1871,11 @@ class Config:
18711871
spec: Optional[Spec] = None
18721872
concurrency_level: Optional[ConcurrencyLevel] = None
18731873
api_budget: Optional[HTTPAPIBudget] = None
1874+
max_concurrent_async_job_count: Optional[int] = Field(
1875+
None,
1876+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1877+
title="Maximum Concurrent Asynchronous Jobs",
1878+
)
18741879
metadata: Optional[Dict[str, Any]] = Field(
18751880
None,
18761881
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
@@ -1898,6 +1903,11 @@ class Config:
18981903
spec: Optional[Spec] = None
18991904
concurrency_level: Optional[ConcurrencyLevel] = None
19001905
api_budget: Optional[HTTPAPIBudget] = None
1906+
max_concurrent_async_job_count: Optional[int] = Field(
1907+
None,
1908+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1909+
title="Maximum Concurrent Asynchronous Jobs",
1910+
)
19011911
metadata: Optional[Dict[str, Any]] = Field(
19021912
None,
19031913
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@
503503
IncrementingCountStreamStateConverter,
504504
)
505505
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
506-
from airbyte_cdk.sources.types import Config
506+
from airbyte_cdk.sources.types import Config, ConnectionDefinition
507507
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
508508

509509
ComponentDefinition = Mapping[str, Any]
@@ -527,6 +527,7 @@ def __init__(
527527
disable_resumable_full_refresh: bool = False,
528528
message_repository: Optional[MessageRepository] = None,
529529
connector_state_manager: Optional[ConnectorStateManager] = None,
530+
max_concurrent_async_job_count: Optional[int] = None,
530531
):
531532
self._init_mappings()
532533
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
@@ -540,6 +541,7 @@ def __init__(
540541
)
541542
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
542543
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
544+
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
543545

544546
def _init_mappings(self) -> None:
545547
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -2928,8 +2930,7 @@ def _get_download_retriever() -> SimpleRetrieverTestReadDecorator | SimpleRetrie
29282930
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
29292931
job_repository,
29302932
stream_slices,
2931-
JobTracker(1),
2932-
# FIXME eventually make the number of concurrent jobs in the API configurable. Until then, we limit to 1
2933+
self._job_tracker,
29332934
self._message_repository,
29342935
has_bulk_parent=False,
29352936
# FIXME work would need to be done here in order to detect if a stream as a parent stream that is bulk

airbyte_cdk/sources/file_based/file_based_source.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
FileBasedErrorsCollector,
4949
FileBasedSourceError,
5050
)
51+
from airbyte_cdk.sources.file_based.file_based_stream_permissions_reader import (
52+
AbstractFileBasedStreamPermissionsReader,
53+
)
5154
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
5255
from airbyte_cdk.sources.file_based.file_types import default_parsers
5356
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
@@ -100,8 +103,10 @@ def __init__(
100103
cursor_cls: Type[
101104
Union[AbstractConcurrentFileBasedCursor, AbstractFileBasedCursor]
102105
] = FileBasedConcurrentCursor,
106+
stream_permissions_reader: Optional[AbstractFileBasedStreamPermissionsReader] = None,
103107
):
104108
self.stream_reader = stream_reader
109+
self.stream_permissions_reader = stream_permissions_reader
105110
self.spec_class = spec_class
106111
self.config = config
107112
self.catalog = catalog
@@ -234,6 +239,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
234239
try:
235240
parsed_config = self._get_parsed_config(config)
236241
self.stream_reader.config = parsed_config
242+
if self.stream_permissions_reader:
243+
self.stream_permissions_reader.config = parsed_config
237244
streams: List[Stream] = []
238245
for stream_config in parsed_config.streams:
239246
# Like state_manager, `catalog_stream` may be None during `check`
@@ -337,9 +344,23 @@ def _make_default_stream(
337344
preserve_directory_structure=preserve_directory_structure(parsed_config),
338345
)
339346

347+
def _ensure_permissions_reader_available(self) -> None:
348+
"""
349+
Validates that a stream permissions reader is available.
350+
Raises a ValueError if the reader is not provided.
351+
"""
352+
if not self.stream_permissions_reader:
353+
raise ValueError(
354+
"Stream permissions reader is required for streams that use permissions transfer mode."
355+
)
356+
340357
def _make_permissions_stream(
341358
self, stream_config: FileBasedStreamConfig, cursor: Optional[AbstractFileBasedCursor]
342359
) -> AbstractFileBasedStream:
360+
"""
361+
Creates a stream that reads permissions from files.
362+
"""
363+
self._ensure_permissions_reader_available()
343364
return PermissionsFileBasedStream(
344365
config=stream_config,
345366
catalog_schema=self.stream_schemas.get(stream_config.name),
@@ -350,6 +371,7 @@ def _make_permissions_stream(
350371
validation_policy=self._validate_and_get_validation_policy(stream_config),
351372
errors_collector=self.errors_collector,
352373
cursor=cursor,
374+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
353375
)
354376

355377
def _make_file_based_stream(
@@ -370,9 +392,10 @@ def _make_file_based_stream(
370392
def _make_identities_stream(
371393
self,
372394
) -> Stream:
395+
self._ensure_permissions_reader_available()
373396
return FileIdentitiesStream(
374397
catalog_schema=self.stream_schemas.get(FileIdentitiesStream.IDENTITIES_STREAM_NAME),
375-
stream_reader=self.stream_reader,
398+
stream_permissions_reader=self.stream_permissions_reader, # type: ignore
376399
discovery_policy=self.discovery_policy,
377400
errors_collector=self.errors_collector,
378401
)

0 commit comments

Comments
 (0)