|
65 | 65 | from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel |
66 | 66 | from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel |
67 | 67 | from airbyte_cdk.sources.declarative.models import DefaultPaginator as DefaultPaginatorModel |
| 68 | +from airbyte_cdk.sources.declarative.models import ( |
| 69 | + GroupingPartitionRouter as GroupingPartitionRouterModel, |
| 70 | +) |
68 | 71 | from airbyte_cdk.sources.declarative.models import HttpRequester as HttpRequesterModel |
69 | 72 | from airbyte_cdk.sources.declarative.models import JwtAuthenticator as JwtAuthenticatorModel |
70 | 73 | from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel |
|
96 | 99 | from airbyte_cdk.sources.declarative.partition_routers import ( |
97 | 100 | AsyncJobPartitionRouter, |
98 | 101 | CartesianProductStreamSlicer, |
| 102 | + GroupingPartitionRouter, |
99 | 103 | ListPartitionRouter, |
100 | 104 | SinglePartitionRouter, |
101 | 105 | SubstreamPartitionRouter, |
@@ -3840,3 +3844,59 @@ def test_api_budget_fixed_window_policy(): |
3840 | 3844 | assert matcher._method == "GET" |
3841 | 3845 | assert matcher._url_base == "https://example.org" |
3842 | 3846 | assert matcher._url_path_pattern.pattern == "/v2/data" |
| 3847 | + |
| 3848 | + |
| 3849 | +def test_create_grouping_partition_router_with_underlying_router(): |
| 3850 | + content = """ |
| 3851 | + schema_loader: |
| 3852 | + file_path: "./source_example/schemas/{{ parameters['name'] }}.yaml" |
| 3853 | + name: "{{ parameters['stream_name'] }}" |
| 3854 | + retriever: |
| 3855 | + requester: |
| 3856 | + type: "HttpRequester" |
| 3857 | + path: "example" |
| 3858 | + record_selector: |
| 3859 | + extractor: |
| 3860 | + field_path: [] |
| 3861 | + stream_A: |
| 3862 | + type: DeclarativeStream |
| 3863 | + name: "A" |
| 3864 | + primary_key: "id" |
| 3865 | + $parameters: |
| 3866 | + retriever: "#/retriever" |
| 3867 | + url_base: "https://airbyte.io" |
| 3868 | + schema_loader: "#/schema_loader" |
| 3869 | + sub_partition_router: |
| 3870 | + type: SubstreamPartitionRouter |
| 3871 | + parent_stream_configs: |
| 3872 | + - stream: "#/stream_A" |
| 3873 | + parent_key: id |
| 3874 | + partition_field: repository_id |
| 3875 | + partition_router: |
| 3876 | + type: GroupingPartitionRouter |
| 3877 | + underlying_partition_router: "#/sub_partition_router" |
| 3878 | + group_size: 2 |
| 3879 | + """ |
| 3880 | + parsed_manifest = YamlDeclarativeSource._parse(content) |
| 3881 | + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) |
| 3882 | + partition_router_manifest = transformer.propagate_types_and_parameters( |
| 3883 | + "", resolved_manifest["partition_router"], {} |
| 3884 | + ) |
| 3885 | + |
| 3886 | + partition_router = factory.create_component( |
| 3887 | + model_type=GroupingPartitionRouterModel, |
| 3888 | + component_definition=partition_router_manifest, |
| 3889 | + config=input_config, |
| 3890 | + ) |
| 3891 | + |
| 3892 | + # Test the created partition router |
| 3893 | + assert isinstance(partition_router, GroupingPartitionRouter) |
| 3894 | + assert isinstance(partition_router.underlying_partition_router, SubstreamPartitionRouter) |
| 3895 | + assert partition_router.group_size == 2 |
| 3896 | + |
| 3897 | + # Test the underlying partition router |
| 3898 | + parent_stream_configs = partition_router.underlying_partition_router.parent_stream_configs |
| 3899 | + assert len(parent_stream_configs) == 1 |
| 3900 | + assert isinstance(parent_stream_configs[0].stream, DeclarativeStream) |
| 3901 | + assert parent_stream_configs[0].parent_key.eval({}) == "id" |
| 3902 | + assert parent_stream_configs[0].partition_field.eval({}) == "repository_id" |
0 commit comments