|
15 | 15 | from typing import Any, DefaultDict, Iterable, List, Mapping, Optional |
16 | 16 | from urllib.parse import urlparse |
17 | 17 |
|
| 18 | +import orjson |
18 | 19 | import requests |
19 | | -from orjson import orjson |
20 | 20 | from requests import PreparedRequest, Response, Session |
21 | 21 |
|
22 | 22 | from airbyte_cdk.connector import TConfig |
@@ -129,7 +129,11 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: |
129 | 129 |
|
130 | 130 | source_spec: ConnectorSpecification = self.source.spec(self.logger) |
131 | 131 | try: |
132 | | - with tempfile.TemporaryDirectory() as temp_dir: |
| 132 | + with tempfile.TemporaryDirectory( |
| 133 | + # Cleanup can fail on Windows due to file locks. Ignore if so, |
| 134 | + # rather than failing the whole process. |
| 135 | + ignore_cleanup_errors=True, |
| 136 | + ) as temp_dir: |
133 | 137 | os.environ[ENV_REQUEST_CACHE_PATH] = ( |
134 | 138 | temp_dir # set this as default directory for request_cache to store *.sqlite files |
135 | 139 | ) |
@@ -246,12 +250,18 @@ def handle_record_counts( |
246 | 250 | ) -> AirbyteMessage: |
247 | 251 | match message.type: |
248 | 252 | case Type.RECORD: |
| 253 | + if message.record is None: |
| 254 | + raise ValueError("Record message must have a record attribute") |
| 255 | + |
249 | 256 | stream_message_count[ |
250 | 257 | HashableStreamDescriptor( |
251 | 258 | name=message.record.stream, namespace=message.record.namespace |
252 | 259 | ) |
253 | 260 | ] += 1.0 # type: ignore[union-attr] # record has `stream` and `namespace` |
254 | 261 | case Type.STATE: |
| 262 | + if message.state is None: |
| 263 | + raise ValueError("State message must have a state attribute") |
| 264 | + |
255 | 265 | stream_descriptor = message_utils.get_stream_descriptor(message) |
256 | 266 |
|
257 | 267 | # Set record count from the counter onto the state message |
|
0 commit comments