Skip to content

Commit 447ace6

Browse files
author
maxi297
committed
TMP cdk features for the release of bing-ads to low-code
1 parent 4a96a2a commit 447ace6

File tree

6 files changed

+160
-115
lines changed

6 files changed

+160
-115
lines changed

airbyte_cdk/sources/declarative/auth/oauth.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
import logging
55
from dataclasses import InitVar, dataclass, field
66
from datetime import datetime, timedelta
7-
from typing import Any, List, Mapping, MutableMapping, Optional, Union
7+
from typing import Any, List, Mapping, Optional, Union
88

99
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
1010
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
@@ -19,6 +19,8 @@
1919
)
2020
from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
2121

22+
logger = logging.getLogger("airbyte")
23+
2224

2325
@dataclass
2426
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator):
@@ -201,7 +203,8 @@ def get_client_secret(self) -> str:
201203
self._client_secret.eval(self.config) if self._client_secret else self._client_secret
202204
)
203205
if not client_secret:
204-
raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
206+
# We've seen some APIs allowing empty client_secret so we will only log here
207+
logger.warning("OAuthAuthenticator was unable to evaluate client_secret parameter hence it'll be empty")
205208
return client_secret # type: ignore # value will be returned as a string, or an error will be raised
206209

207210
def get_refresh_token_name(self) -> str:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def __init__(
8888
emit_connector_builder_messages=emit_connector_builder_messages,
8989
disable_resumable_full_refresh=True,
9090
connector_state_manager=self._connector_state_manager,
91+
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
9192
)
9293

9394
super().__init__(

airbyte_cdk/sources/declarative/requesters/http_job_repository.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,13 @@ def _get_polling_response_interpolation_context(self, job: AsyncJob) -> Dict[str
320320
return polling_response_context
321321

322322
def _get_create_job_stream_slice(self, job: AsyncJob) -> StreamSlice:
323-
stream_slice = StreamSlice(
324-
partition={},
325-
cursor_slice={},
326-
extra_fields={
323+
return StreamSlice(
324+
partition=job.job_parameters().partition,
325+
cursor_slice=job.job_parameters().cursor_slice,
326+
extra_fields=dict(job.job_parameters().extra_fields) | {
327327
"creation_response": self._get_creation_response_interpolation_context(job),
328328
},
329329
)
330-
return stream_slice
331330

332331
def _get_download_targets(self, job: AsyncJob) -> Iterable[str]:
333332
if not self.download_target_requester:

unit_tests/sources/declarative/auth/test_oauth.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import base64
66
import json
77
import logging
8+
from copy import deepcopy
89
from datetime import timedelta, timezone
910
from unittest.mock import Mock
1011

@@ -128,6 +129,20 @@ def test_refresh_with_encode_config_params(self):
128129
}
129130
assert body == expected
130131

132+
def test_client_secret_empty(self):
133+
config_without_client_secret = deepcopy(config)
134+
del config_without_client_secret["client_secret"]
135+
oauth = DeclarativeOauth2Authenticator(
136+
token_refresh_endpoint="{{ config['refresh_endpoint'] }}",
137+
client_id="{{ config['client_id'] }}",
138+
client_secret="{{ config['client_secret'] }}",
139+
config=config_without_client_secret,
140+
parameters={},
141+
grant_type="client_credentials",
142+
)
143+
body = oauth.build_refresh_request_body()
144+
assert body["client_secret"] == ""
145+
131146
def test_refresh_with_decode_config_params(self):
132147
updated_config_fields = {
133148
"client_id": base64.b64encode(config["client_id"].encode("utf-8")).decode(),

unit_tests/sources/declarative/parsers/conftest.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
4+
from pathlib import Path
55
from typing import Any, Dict
66

77
import pytest
@@ -645,10 +645,7 @@ def expected_manifest_with_url_base_linked_definition_normalized() -> Dict[str,
645645

646646
@pytest.fixture
647647
def manifest_with_linked_definitions_url_base_authenticator_abnormal_schemas() -> Dict[str, Any]:
648-
with open(
649-
"unit_tests/sources/declarative/parsers/resources/abnormal_schemas_manifest.yaml",
650-
"r",
651-
) as file:
648+
with open(str(Path(__file__).parent / "resources/abnormal_schemas_manifest.yaml"), "r") as file:
652649
return dict(yaml.safe_load(file))
653650

654651

unit_tests/sources/declarative/requesters/test_http_job_repository.py

Lines changed: 132 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33

44
import json
5+
from turtledemo.sorting_animate import partition
6+
from typing import Optional
57
from unittest import TestCase
68
from unittest.mock import Mock
79

@@ -28,6 +30,8 @@
2830
)
2931
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
3032
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
33+
from airbyte_cdk.sources.message import MessageRepository
34+
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
3135
from airbyte_cdk.sources.types import StreamSlice
3236
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
3337
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
@@ -45,112 +49,12 @@
4549
a_record_id,a_value
4650
"""
4751
_A_CURSOR_FOR_PAGINATION = "a-cursor-for-pagination"
52+
_ERROR_HANDLER = DefaultErrorHandler(config=_ANY_CONFIG, parameters={})
4853

4954

5055
class HttpJobRepositoryTest(TestCase):
5156
def setUp(self) -> None:
52-
message_repository = Mock()
53-
error_handler = DefaultErrorHandler(config=_ANY_CONFIG, parameters={})
54-
55-
self._create_job_requester = HttpRequester(
56-
name="stream <name>: create_job",
57-
url_base=_URL_BASE,
58-
path=_EXPORT_PATH,
59-
error_handler=error_handler,
60-
http_method=HttpMethod.POST,
61-
config=_ANY_CONFIG,
62-
disable_retries=False,
63-
parameters={},
64-
message_repository=message_repository,
65-
use_cache=False,
66-
stream_response=False,
67-
)
68-
69-
self._polling_job_requester = HttpRequester(
70-
name="stream <name>: polling",
71-
url_base=_URL_BASE,
72-
path=_EXPORT_PATH + "/{{creation_response['id']}}",
73-
error_handler=error_handler,
74-
http_method=HttpMethod.GET,
75-
config=_ANY_CONFIG,
76-
disable_retries=False,
77-
parameters={},
78-
message_repository=message_repository,
79-
use_cache=False,
80-
stream_response=False,
81-
)
82-
83-
self._download_retriever = SimpleRetriever(
84-
requester=HttpRequester(
85-
name="stream <name>: fetch_result",
86-
url_base="",
87-
path="{{download_target}}",
88-
error_handler=error_handler,
89-
http_method=HttpMethod.GET,
90-
config=_ANY_CONFIG,
91-
disable_retries=False,
92-
parameters={},
93-
message_repository=message_repository,
94-
use_cache=False,
95-
stream_response=True,
96-
),
97-
record_selector=RecordSelector(
98-
extractor=ResponseToFileExtractor({}),
99-
record_filter=None,
100-
transformations=[],
101-
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
102-
config=_ANY_CONFIG,
103-
parameters={},
104-
),
105-
primary_key=None,
106-
name="any name",
107-
paginator=DefaultPaginator(
108-
decoder=NoopDecoder(),
109-
page_size_option=None,
110-
page_token_option=RequestOption(
111-
field_name="locator",
112-
inject_into=RequestOptionType.request_parameter,
113-
parameters={},
114-
),
115-
pagination_strategy=CursorPaginationStrategy(
116-
cursor_value="{{ headers['Sforce-Locator'] }}",
117-
decoder=NoopDecoder(),
118-
config=_ANY_CONFIG,
119-
parameters={},
120-
),
121-
url_base=_URL_BASE,
122-
config=_ANY_CONFIG,
123-
parameters={},
124-
),
125-
config=_ANY_CONFIG,
126-
parameters={},
127-
)
128-
129-
self._repository = AsyncHttpJobRepository(
130-
creation_requester=self._create_job_requester,
131-
polling_requester=self._polling_job_requester,
132-
download_retriever=self._download_retriever,
133-
abort_requester=None,
134-
delete_requester=None,
135-
status_extractor=DpathExtractor(
136-
decoder=JsonDecoder(parameters={}),
137-
field_path=["status"],
138-
config={},
139-
parameters={} or {},
140-
),
141-
status_mapping={
142-
"ready": AsyncJobStatus.COMPLETED,
143-
"failure": AsyncJobStatus.FAILED,
144-
"pending": AsyncJobStatus.RUNNING,
145-
},
146-
download_target_extractor=DpathExtractor(
147-
decoder=JsonDecoder(parameters={}),
148-
field_path=["urls"],
149-
config={},
150-
parameters={} or {},
151-
),
152-
)
153-
57+
self._repository = self._create_async_job_repository()
15458
self._http_mocker = HttpMocker()
15559
self._http_mocker.__enter__()
15660

@@ -178,6 +82,32 @@ def test_given_different_statuses_when_update_jobs_status_then_update_status_pro
17882
self._repository.update_jobs_status([job])
17983
assert job.status() == AsyncJobStatus.COMPLETED
18084

85+
def test_when_update_jobs_status_then_allow_access_to_stream_slice_information(self) -> None:
86+
stream_slice = StreamSlice(partition={"path": "path_from_slice"}, cursor_slice={})
87+
self._mock_create_response(_A_JOB_ID)
88+
self._http_mocker.get(
89+
HttpRequest(url=f"{_EXPORT_URL}/{stream_slice['path']}/{_A_JOB_ID}"),
90+
HttpResponse(body=json.dumps({"id": _A_JOB_ID, "status": "ready"})),
91+
)
92+
repository = self._create_async_job_repository(HttpRequester(
93+
name="stream <name>: polling",
94+
url_base=_URL_BASE,
95+
path=_EXPORT_PATH + "/{{stream_slice['path']}}/{{creation_response['id']}}",
96+
error_handler=_ERROR_HANDLER,
97+
http_method=HttpMethod.GET,
98+
config=_ANY_CONFIG,
99+
disable_retries=False,
100+
parameters={},
101+
message_repository=Mock(), # this might not align with the rest of the components in async job repository but if message_repository becomes important for tests, please share this instance with the other components
102+
use_cache=False,
103+
stream_response=False,
104+
))
105+
106+
job = repository.start(stream_slice)
107+
repository.update_jobs_status([job])
108+
109+
assert job.status() == AsyncJobStatus.COMPLETED
110+
181111
def test_given_unknown_status_when_update_jobs_status_then_raise_error(self) -> None:
182112
self._mock_create_response(_A_JOB_ID)
183113
self._http_mocker.get(
@@ -277,3 +207,103 @@ def _mock_create_response(self, job_id: str) -> None:
277207
HttpRequest(url=_EXPORT_URL),
278208
HttpResponse(body=json.dumps({"id": job_id})),
279209
)
210+
211+
def _create_async_job_repository(self, polling_job_requester: Optional[HttpRequester] = None) -> AsyncHttpJobRepository:
212+
message_repository = Mock()
213+
create_job_requester = HttpRequester(
214+
name="stream <name>: create_job",
215+
url_base=_URL_BASE,
216+
path=_EXPORT_PATH,
217+
error_handler=_ERROR_HANDLER,
218+
http_method=HttpMethod.POST,
219+
config=_ANY_CONFIG,
220+
disable_retries=False,
221+
parameters={},
222+
message_repository=message_repository,
223+
use_cache=False,
224+
stream_response=False,
225+
)
226+
polling_job_requester = polling_job_requester if polling_job_requester else HttpRequester(
227+
name="stream <name>: polling",
228+
url_base=_URL_BASE,
229+
path=_EXPORT_PATH + "/{{creation_response['id']}}",
230+
error_handler=_ERROR_HANDLER,
231+
http_method=HttpMethod.GET,
232+
config=_ANY_CONFIG,
233+
disable_retries=False,
234+
parameters={},
235+
message_repository=message_repository,
236+
use_cache=False,
237+
stream_response=False,
238+
)
239+
240+
download_retriever = SimpleRetriever(
241+
requester=HttpRequester(
242+
name="stream <name>: fetch_result",
243+
url_base="",
244+
path="{{download_target}}",
245+
error_handler=_ERROR_HANDLER,
246+
http_method=HttpMethod.GET,
247+
config=_ANY_CONFIG,
248+
disable_retries=False,
249+
parameters={},
250+
message_repository=message_repository,
251+
use_cache=False,
252+
stream_response=True,
253+
),
254+
record_selector=RecordSelector(
255+
extractor=ResponseToFileExtractor({}),
256+
record_filter=None,
257+
transformations=[],
258+
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
259+
config=_ANY_CONFIG,
260+
parameters={},
261+
),
262+
primary_key=None,
263+
name="any name",
264+
paginator=DefaultPaginator(
265+
decoder=NoopDecoder(),
266+
page_size_option=None,
267+
page_token_option=RequestOption(
268+
field_name="locator",
269+
inject_into=RequestOptionType.request_parameter,
270+
parameters={},
271+
),
272+
pagination_strategy=CursorPaginationStrategy(
273+
cursor_value="{{ headers['Sforce-Locator'] }}",
274+
decoder=NoopDecoder(),
275+
config=_ANY_CONFIG,
276+
parameters={},
277+
),
278+
url_base=_URL_BASE,
279+
config=_ANY_CONFIG,
280+
parameters={},
281+
),
282+
config=_ANY_CONFIG,
283+
parameters={},
284+
)
285+
286+
return AsyncHttpJobRepository(
287+
creation_requester=create_job_requester,
288+
polling_requester=polling_job_requester,
289+
download_retriever=download_retriever,
290+
abort_requester=None,
291+
delete_requester=None,
292+
status_extractor=DpathExtractor(
293+
decoder=JsonDecoder(parameters={}),
294+
field_path=["status"],
295+
config={},
296+
parameters={} or {},
297+
),
298+
status_mapping={
299+
"ready": AsyncJobStatus.COMPLETED,
300+
"failure": AsyncJobStatus.FAILED,
301+
"pending": AsyncJobStatus.RUNNING,
302+
},
303+
download_target_extractor=DpathExtractor(
304+
decoder=JsonDecoder(parameters={}),
305+
field_path=["urls"],
306+
config={},
307+
parameters={} or {},
308+
),
309+
)

0 commit comments

Comments
 (0)