Skip to content

Commit d1d5107

Browse files
author
maxi297
committed
PoC for file upload
1 parent c4d0f91 commit d1d5107

File tree

9 files changed

+427
-35
lines changed

9 files changed

+427
-35
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
PerPartitionWithGlobalCursor,
2626
)
2727
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
28+
from airbyte_cdk.sources.declarative.models import FileUploader
2829
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
2930
ConcurrencyLevel as ConcurrencyLevelModel,
3031
)
@@ -206,6 +207,14 @@ def _group_streams(
206207
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
207208
# so we need to treat them as synchronous
208209

210+
file_uploader = None
211+
if isinstance(declarative_stream, DeclarativeStream):
212+
file_uploader = self._constructor.create_component(
213+
model_type=FileUploader,
214+
component_definition=name_to_stream_mapping[declarative_stream.name]["file_uploader"],
215+
config=config,
216+
) if "file_uploader" in name_to_stream_mapping[declarative_stream.name] else None
217+
209218
if (
210219
isinstance(declarative_stream, DeclarativeStream)
211220
and name_to_stream_mapping[declarative_stream.name]["type"]
@@ -273,6 +282,7 @@ def _group_streams(
273282
declarative_stream.get_json_schema(),
274283
retriever,
275284
self.message_repository,
285+
file_uploader,
276286
),
277287
stream_slicer=declarative_stream.retriever.stream_slicer,
278288
)
@@ -303,6 +313,7 @@ def _group_streams(
303313
declarative_stream.get_json_schema(),
304314
retriever,
305315
self.message_repository,
316+
file_uploader,
306317
),
307318
stream_slicer=cursor,
308319
)
@@ -333,6 +344,7 @@ def _group_streams(
333344
declarative_stream.get_json_schema(),
334345
declarative_stream.retriever,
335346
self.message_repository,
347+
file_uploader,
336348
),
337349
declarative_stream.retriever.stream_slicer,
338350
)
@@ -392,6 +404,7 @@ def _group_streams(
392404
declarative_stream.get_json_schema(),
393405
retriever,
394406
self.message_repository,
407+
file_uploader,
395408
),
396409
perpartition_cursor,
397410
)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,33 @@ definitions:
14221422
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
14231423
- "$ref": "#/definitions/CustomStateMigration"
14241424
default: []
1425+
file_uploader:
1426+
title: File Uploader
1427+
description: (experimental) Describes how to fetch a file
1428+
type: object
1429+
required:
1430+
- type
1431+
- requester
1432+
- download_target_extractor
1433+
properties:
1434+
type:
1435+
type: string
1436+
enum: [ FileUploader ]
1437+
requester:
1438+
description: Requester component that describes how to prepare HTTP requests to send to the source API.
1439+
anyOf:
1440+
- "$ref": "#/definitions/CustomRequester"
1441+
- "$ref": "#/definitions/HttpRequester"
1442+
download_target_extractor:
1443+
description: Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response
1444+
anyOf:
1445+
- "$ref": "#/definitions/CustomRecordExtractor"
1446+
- "$ref": "#/definitions/DpathExtractor"
1447+
file_extractor:
1448+
description: Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content
1449+
anyOf:
1450+
- "$ref": "#/definitions/CustomRecordExtractor"
1451+
- "$ref": "#/definitions/DpathExtractor"
14251452
$parameters:
14261453
type: object
14271454
additional_properties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
609609
scopes: Optional[List[str]] = Field(
610610
None,
611611
description="List of scopes that should be granted to the access token.",
612-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
612+
examples=[
613+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
614+
],
613615
title="Scopes",
614616
)
615617
token_expiry_date: Optional[str] = Field(
@@ -1083,24 +1085,28 @@ class OAuthConfigSpecification(BaseModel):
10831085
class Config:
10841086
extra = Extra.allow
10851087

1086-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1087-
None,
1088-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1089-
examples=[
1090-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1091-
{
1092-
"app_id": {
1093-
"type": "string",
1094-
"path_in_connector_config": ["info", "app_id"],
1095-
}
1096-
},
1097-
],
1098-
title="OAuth user input",
1088+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1089+
Field(
1090+
None,
1091+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1092+
examples=[
1093+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1094+
{
1095+
"app_id": {
1096+
"type": "string",
1097+
"path_in_connector_config": ["info", "app_id"],
1098+
}
1099+
},
1100+
],
1101+
title="OAuth user input",
1102+
)
10991103
)
1100-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1101-
None,
1102-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1103-
title="DeclarativeOAuth Connector Specification",
1104+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1105+
Field(
1106+
None,
1107+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1108+
title="DeclarativeOAuth Connector Specification",
1109+
)
11041110
)
11051111
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11061112
None,
@@ -1118,7 +1124,9 @@ class Config:
11181124
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11191125
None,
11201126
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1121-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1127+
examples=[
1128+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1129+
],
11221130
title="OAuth input specification",
11231131
)
11241132
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1781,7 +1789,9 @@ class RecordSelector(BaseModel):
17811789
description="Responsible for filtering records to be emitted by the Source.",
17821790
title="Record Filter",
17831791
)
1784-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1792+
schema_normalization: Optional[
1793+
Union[SchemaNormalization, CustomSchemaNormalization]
1794+
] = Field(
17851795
SchemaNormalization.None_,
17861796
description="Responsible for normalization according to the schema.",
17871797
title="Schema Normalization",
@@ -2006,7 +2016,9 @@ class Config:
20062016
description="Component used to fetch data incrementally based on a time field in the data.",
20072017
title="Incremental Sync",
20082018
)
2009-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2019+
name: Optional[str] = Field(
2020+
"", description="The stream name.", example=["Users"], title="Name"
2021+
)
20102022
primary_key: Optional[PrimaryKey] = Field(
20112023
"", description="The primary key of the stream.", title="Primary Key"
20122024
)
@@ -2264,7 +2276,9 @@ class ParentStreamConfig(BaseModel):
22642276

22652277
class StateDelegatingStream(BaseModel):
22662278
type: Literal["StateDelegatingStream"]
2267-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2279+
name: str = Field(
2280+
..., description="The stream name.", example=["Users"], title="Name"
2281+
)
22682282
full_refresh_stream: DeclarativeStream = Field(
22692283
...,
22702284
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2278,6 +2292,22 @@ class StateDelegatingStream(BaseModel):
22782292
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22792293

22802294

2295+
class FileUploader(BaseModel):
2296+
type: Literal["FileUploader"]
2297+
requester: Union[CustomRequester, HttpRequester] = Field(
2298+
...,
2299+
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
2300+
)
2301+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2302+
...,
2303+
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2304+
)
2305+
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2306+
None,
2307+
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2308+
)
2309+
2310+
22812311
class SimpleRetriever(BaseModel):
22822312
type: Literal["SimpleRetriever"]
22832313
record_selector: RecordSelector = Field(
@@ -2301,13 +2331,22 @@ class SimpleRetriever(BaseModel):
23012331
CustomPartitionRouter,
23022332
ListPartitionRouter,
23032333
SubstreamPartitionRouter,
2304-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2334+
List[
2335+
Union[
2336+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2337+
]
2338+
],
23052339
]
23062340
] = Field(
23072341
[],
23082342
description="PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.",
23092343
title="Partition Router",
23102344
)
2345+
file_uploader: Optional[FileUploader] = Field(
2346+
None,
2347+
description="(experimental) Describes how to fetch a file",
2348+
title="File Uploader",
2349+
)
23112350
decoder: Optional[
23122351
Union[
23132352
CustomDecoder,
@@ -2345,7 +2384,9 @@ class AsyncRetriever(BaseModel):
23452384
)
23462385
download_extractor: Optional[
23472386
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
2348-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2387+
] = Field(
2388+
None, description="Responsible for fetching the records from provided urls."
2389+
)
23492390
creation_requester: Union[CustomRequester, HttpRequester] = Field(
23502391
...,
23512392
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2383,7 +2424,11 @@ class AsyncRetriever(BaseModel):
23832424
CustomPartitionRouter,
23842425
ListPartitionRouter,
23852426
SubstreamPartitionRouter,
2386-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2427+
List[
2428+
Union[
2429+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2430+
]
2431+
],
23872432
]
23882433
] = Field(
23892434
[],
@@ -2451,10 +2496,12 @@ class DynamicDeclarativeStream(BaseModel):
24512496
stream_template: DeclarativeStream = Field(
24522497
..., description="Reference to the stream template.", title="Stream Template"
24532498
)
2454-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2455-
...,
2456-
description="Component resolve and populates stream templates with components values.",
2457-
title="Components Resolver",
2499+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2500+
Field(
2501+
...,
2502+
description="Component resolve and populates stream templates with components values.",
2503+
title="Components Resolver",
2504+
)
24582505
)
24592506

24602507

0 commit comments

Comments
 (0)