Skip to content

Commit fb51377

Browse files
authored
Merge branch 'aj/feat/add-standard-tests-cli' into aj/feat/add-cli-build-command
2 parents b0ba703 + 9ea97fa commit fb51377

File tree

46 files changed

+1520
-358
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1520
-358
lines changed

.github/workflows/connector-tests.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ jobs:
7575
# Chargebee is being flaky:
7676
# - connector: source-chargebee
7777
# cdk_extra: n/a
78-
# These two are behind in CDK updates and can't be used as tests until they are updated:
79-
# - connector: source-s3
80-
# cdk_extra: file-based
78+
# This one is behind in CDK updates and can't be used as tests until it is updated:
8179
# - connector: destination-pinecone
8280
# cdk_extra: vector-db-based
81+
- connector: source-google-drive
82+
cdk_extra: file-based
8383
- connector: destination-motherduck
8484
cdk_extra: sql
8585
# ZenDesk currently failing (as of 2024-12-02)

.github/workflows/poe-command.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
name: On-Demand Poe Task
2+
3+
on:
4+
workflow_dispatch:
5+
inputs:
6+
comment-id:
7+
description: "Optional comment-id of the slash command. Ignore if not applicable."
8+
required: false
9+
pr:
10+
description: "PR Number"
11+
required: false
12+
13+
permissions:
14+
contents: write
15+
pull-requests: write
16+
17+
jobs:
18+
run-poe-command:
19+
env:
20+
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
21+
runs-on: ubuntu-latest
22+
steps:
23+
- name: Run Poe Slash Command Processor
24+
uses: aaronsteers/poe-command-processor@v1
25+
with:
26+
pr: ${{ github.event.inputs.pr }}
27+
comment-id: ${{ github.event.inputs.comment-id }}
28+
github-token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}

.github/workflows/slash_command_dispatch.yml

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ on:
66

77
jobs:
88
slashCommandDispatch:
9-
# Only allow slash commands on pull request (not on issues)
10-
if: ${{ github.event.issue.pull_request }}
119
runs-on: ubuntu-24.04
1210
steps:
1311
- name: Slash Command Dispatch
@@ -19,17 +17,25 @@ jobs:
1917
repository: ${{ github.repository }}
2018
token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
2119
dispatch-type: workflow
22-
issue-type: pull-request
20+
issue-type: both
21+
22+
# Only run for users with 'write' permission on the main repository
23+
permission: write
24+
2325
commands: |
2426
autofix
2527
test
2628
poetry-lock
29+
poe
30+
31+
# Notes regarding static-args:
32+
# - Slash commands can be invoked from both issues and comments.
33+
# - If the slash command is invoked from an issue, we intentionally pass 'null' as the PR number.
34+
# - Comment ID will always be sent, and this is sufficient to post back status updates to the originating comment.
2735
static-args: |
28-
pr=${{ github.event.issue.number }}
36+
pr=${{ github.event.issue.pull_request != null && github.event.issue.number || '' }}
2937
comment-id=${{ github.event.comment.id }}
3038
31-
# Only run for users with 'write' permission on the main repository
32-
permission: write
3339
3440
- name: Edit comment with error message
3541
if: steps.dispatch.outputs.error-message

airbyte_cdk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/models/airbyte_protocol.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
99
from serpyco_rs.metadata import Alias
1010

11-
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
12-
1311
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
1412

1513

@@ -84,7 +82,7 @@ class AirbyteMessage:
8482
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
8583
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
8684
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
87-
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
85+
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
8886
state: Optional[AirbyteStateMessage] = None
8987
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
9088
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]

airbyte_cdk/models/file_transfer_record_message.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
149149
message = stream_data_to_airbyte_message(
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152-
is_file_transfer_message=record.is_file_transfer_message,
152+
file_reference=record.file_reference,
153153
)
154154
stream = self._stream_name_to_instance[record.stream_name]
155155

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
PerPartitionWithGlobalCursor,
2929
)
3030
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
31+
from airbyte_cdk.sources.declarative.models import FileUploader
3132
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3233
ConcurrencyLevel as ConcurrencyLevelModel,
3334
)
@@ -209,6 +210,11 @@ def _group_streams(
209210
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
210211
# so we need to treat them as synchronous
211212

213+
supports_file_transfer = (
214+
isinstance(declarative_stream, DeclarativeStream)
215+
and "file_uploader" in name_to_stream_mapping[declarative_stream.name]
216+
)
217+
212218
if (
213219
isinstance(declarative_stream, DeclarativeStream)
214220
and name_to_stream_mapping[declarative_stream.name]["type"]
@@ -325,6 +331,7 @@ def _group_streams(
325331
else None,
326332
logger=self.logger,
327333
cursor=cursor,
334+
supports_file_transfer=supports_file_transfer,
328335
)
329336
)
330337
elif (
@@ -356,6 +363,7 @@ def _group_streams(
356363
cursor_field=None,
357364
logger=self.logger,
358365
cursor=final_state_cursor,
366+
supports_file_transfer=supports_file_transfer,
359367
)
360368
)
361369
elif (
@@ -410,6 +418,7 @@ def _group_streams(
410418
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
411419
logger=self.logger,
412420
cursor=perpartition_cursor,
421+
supports_file_transfer=supports_file_transfer,
413422
)
414423
)
415424
else:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,42 @@ definitions:
14481448
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
14491449
- "$ref": "#/definitions/CustomStateMigration"
14501450
default: []
1451+
file_uploader:
1452+
title: File Uploader
1453+
description: (experimental) Describes how to fetch a file
1454+
type: object
1455+
required:
1456+
- type
1457+
- requester
1458+
- download_target_extractor
1459+
properties:
1460+
type:
1461+
type: string
1462+
enum: [ FileUploader ]
1463+
requester:
1464+
description: Requester component that describes how to prepare HTTP requests to send to the source API.
1465+
anyOf:
1466+
- "$ref": "#/definitions/CustomRequester"
1467+
- "$ref": "#/definitions/HttpRequester"
1468+
download_target_extractor:
1469+
description: Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response
1470+
anyOf:
1471+
- "$ref": "#/definitions/CustomRecordExtractor"
1472+
- "$ref": "#/definitions/DpathExtractor"
1473+
file_extractor:
1474+
description: Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content
1475+
anyOf:
1476+
- "$ref": "#/definitions/CustomRecordExtractor"
1477+
- "$ref": "#/definitions/DpathExtractor"
1478+
filename_extractor:
1479+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
1480+
type: string
1481+
interpolation_context:
1482+
- config
1483+
- record
1484+
examples:
1485+
- "{{ record.id }}/{{ record.file_name }}/"
1486+
- "{{ record.id }}_{{ record.file_name }}/"
14511487
$parameters:
14521488
type: object
14531489
additional_properties: true

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
1819
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1920
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2021
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
4243
record_filter: Optional[RecordFilter] = None
4344
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4445
transform_before_filtering: bool = False
46+
file_uploader: Optional[FileUploader] = None
4547

4648
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4749
self._parameters = parameters
@@ -117,7 +119,10 @@ def filter_and_transform(
117119
transformed_filtered_data, schema=records_schema
118120
)
119121
for data in normalized_data:
120-
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122+
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
123+
if self.file_uploader:
124+
self.file_uploader.upload(record)
125+
yield record
121126

122127
def _normalize_by_schema(
123128
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]

0 commit comments

Comments
 (0)