Skip to content

Commit 96e2a59

Browse files
committed
Merged and updated imports
1 parent a0cadf5 commit 96e2a59

File tree

5 files changed

+877
-822
lines changed

5 files changed

+877
-822
lines changed

airbyte_cdk/sources/declarative/decoders/json_decoder.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
#
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
4-
import codecs
4+
# import codecs
55
import logging
66
from dataclasses import InitVar, dataclass
7-
from gzip import decompress
87
from typing import Any, Generator, List, Mapping, MutableMapping, Optional
98

10-
import orjson
119
import requests
1210

11+
from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, JsonParser
1312
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
1413

1514
logger = logging.getLogger("airbyte")
@@ -35,15 +34,14 @@ def decode(
3534
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
3635
"""
3736
try:
38-
yield self._decoder.decode(response)
37+
yield from self._decoder.decode(response)
3938
except requests.exceptions.JSONDecodeError:
4039
logger.warning(
4140
f"Response cannot be parsed into json: {response.status_code=}, {response.text=}"
4241
)
4342
yield {}
4443

4544

46-
4745
@dataclass
4846
class IterableDecoder(Decoder):
4947
"""

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def filter_and_transform(
108108
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
109109
share the logic of doing transformations on a set of records.
110110
"""
111+
111112
if self.transform_before_filtering:
112113
transformed_data = self._transform(all_data, stream_state, stream_slice)
113114
transformed_filtered_data = self._filter(
@@ -117,10 +118,8 @@ def filter_and_transform(
117118
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
118119
transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice)
119120

120-
no_service_fields_data = remove_service_keys(transformed_data)
121-
normalized_data = self._normalize_by_schema(
122-
no_service_fields_data, schema=records_schema
123-
)
121+
no_service_fields_data = remove_service_keys(transformed_filtered_data)
122+
normalized_data = self._normalize_by_schema(no_service_fields_data, schema=records_schema)
124123
for data in normalized_data:
125124
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
126125

0 commit comments

Comments
 (0)