|
7 | 7 | import copy
|
8 | 8 | import json
|
9 | 9 | import logging
|
10 |
| -import shutil |
11 | 10 | import uuid
|
12 | 11 | from contextvars import ContextVar
|
13 | 12 | from http import HTTPStatus
|
@@ -343,7 +342,8 @@ def update_headers(self, header: Dict[str, str]):
|
343 | 342 | def _handle_file_download(self, raw_response: Any, file_path: str) -> str:
|
344 | 343 | try:
|
345 | 344 | download_file = open(file_path, "wb")
|
346 |
| - shutil.copyfileobj(raw_response, download_file) |
| 345 | + for chunk in raw_response: |
| 346 | + download_file.write(chunk) |
347 | 347 | except Exception as err:
|
348 | 348 | raise ErrorCode.UNABLE_TO_DOWNLOAD_FILE.exception_with_parameters(
|
349 | 349 | str((hasattr(err, "strerror") and err.strerror) or err), file_path
|
@@ -374,15 +374,49 @@ def _call_api_internal(
|
374 | 374 | timeout=timeout,
|
375 | 375 | )
|
376 | 376 | elif api.consumes == EVENT_STREAM and api.produces == EVENT_STREAM:
|
377 |
| - response = self._session.request( |
| 377 | + with self._session.stream( |
378 | 378 | api.method.value,
|
379 | 379 | path,
|
380 | 380 | **params,
|
381 |
| - stream=True, |
382 | 381 | timeout=timeout,
|
383 |
| - ) |
384 |
| - if download_file_path: |
385 |
| - return self._handle_file_download(response.raw, download_file_path) |
| 382 | + ) as stream_response: |
| 383 | + if download_file_path: |
| 384 | + return self._handle_file_download( |
| 385 | + stream_response.iter_raw(), download_file_path |
| 386 | + ) |
| 387 | + |
| 388 | + # For event streams, we need to read the content while the stream is open |
| 389 | + # Store the response data and create a mock response object for common processing |
| 390 | + content = stream_response.read() |
| 391 | + text = content.decode("utf-8") if content else "" |
| 392 | + lines = [] |
| 393 | + |
| 394 | + # Only process lines for successful responses to avoid errors on error responses |
| 395 | + if stream_response.status_code == api.expected_status: |
| 396 | + # Reset stream position and get lines |
| 397 | + lines = text.splitlines() if text else [] |
| 398 | + |
| 399 | + response_data = { |
| 400 | + "status_code": stream_response.status_code, |
| 401 | + "headers": stream_response.headers, |
| 402 | + "text": text, |
| 403 | + "content": content, |
| 404 | + "lines": lines, |
| 405 | + } |
| 406 | + |
| 407 | + # Create a simple namespace object to mimic the response interface |
| 408 | + response = SimpleNamespace( |
| 409 | + status_code=response_data["status_code"], |
| 410 | + headers=response_data["headers"], |
| 411 | + text=response_data["text"], |
| 412 | + content=response_data["content"], |
| 413 | + _stream_lines=response_data[ |
| 414 | + "lines" |
| 415 | + ], # Store lines for event processing |
| 416 | + json=lambda: json.loads(response_data["text"]) |
| 417 | + if response_data["text"] |
| 418 | + else {}, |
| 419 | + ) |
386 | 420 | else:
|
387 | 421 | response = self._session.request(
|
388 | 422 | api.method.value,
|
@@ -429,14 +463,16 @@ def _call_api_internal(
|
429 | 463 | response,
|
430 | 464 | )
|
431 | 465 | if api.consumes == EVENT_STREAM and api.produces == EVENT_STREAM:
|
432 |
| - for line in response.iter_lines(decode_unicode=True): |
433 |
| - if not line: |
434 |
| - continue |
435 |
| - if not line.startswith("data: "): |
436 |
| - raise ErrorCode.UNABLE_TO_DESERIALIZE.exception_with_parameters( |
437 |
| - line |
438 |
| - ) |
439 |
| - events.append(json.loads(line.split("data: ")[1])) |
| 466 | + # Process event stream using stored lines from the streaming response |
| 467 | + if hasattr(response, "_stream_lines"): |
| 468 | + for line in response._stream_lines: |
| 469 | + if not line: |
| 470 | + continue |
| 471 | + if not line.startswith("data: "): |
| 472 | + raise ErrorCode.UNABLE_TO_DESERIALIZE.exception_with_parameters( |
| 473 | + line |
| 474 | + ) |
| 475 | + events.append(json.loads(line.split("data: ")[1])) |
440 | 476 | if text_response:
|
441 | 477 | response_ = response.text
|
442 | 478 | else:
|
|
0 commit comments