Skip to content

Commit 202e79e

Browse files
committed
replace httpx with impit
1 parent bfdf114 commit 202e79e

25 files changed

+276
-193
lines changed

docs/02_concepts/09_streaming.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Supported streaming methods:
1818
- [`KeyValueStoreClient.stream_record`](/reference/class/KeyValueStoreClient#stream_record) - Stream key-value store records as raw data.
1919
- [`LogClient.stream`](/reference/class/LogClient#stream) - Stream logs in real time.
2020

21-
These methods return a raw, context-managed `httpx.Response` object. The response must be consumed within a with block to ensure that the connection is closed automatically, preventing memory leaks or unclosed connections.
21+
These methods return a raw, context-managed `impit.Response` object. The response must be consumed within a with block to ensure that the connection is closed automatically, preventing memory leaks or unclosed connections.
2222

2323
The following example demonstrates how to stream the logs of an Actor run incrementally:
2424

docs/02_concepts/code/01_async_support.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ async def main() -> None:
1717
# Stream the logs
1818
async with log_client.stream() as async_log_stream:
1919
if async_log_stream:
20-
async for line in async_log_stream.aiter_lines():
21-
print(line)
20+
async for bytes_chunk in async_log_stream.aiter_bytes():
21+
print(bytes_chunk)
2222

2323

2424
if __name__ == '__main__':

docs/02_concepts/code/09_streaming_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ async def main() -> None:
1010

1111
async with log_client.stream() as log_stream:
1212
if log_stream:
13-
for line in log_stream.iter_lines():
14-
print(line)
13+
async for bytes_chunk in log_stream.aiter_bytes():
14+
print(bytes_chunk)

docs/02_concepts/code/09_streaming_sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ def main() -> None:
1010

1111
with log_client.stream() as log_stream:
1212
if log_stream:
13-
for line in log_stream.iter_lines():
14-
print(line)
13+
for bytes_chunk in log_stream.iter_bytes():
14+
print(bytes_chunk)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ dependencies = [
2828
"apify-shared<2.0.0",
2929
"colorama>=0.4.0",
3030
"httpx>=0.25",
31+
"impit>=0.5.1",
3132
"more_itertools>=10.0.0",
3233
]
3334

src/apify_client/_errors.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from __future__ import annotations
22

3-
import httpx
3+
import json as jsonlib
4+
5+
import impit
46
from apify_shared.utils import ignore_docs
57

68

@@ -17,20 +19,21 @@ class ApifyApiError(ApifyClientError):
1719
"""
1820

1921
@ignore_docs
20-
def __init__(self, response: httpx.Response, attempt: int) -> None:
22+
def __init__(self, response: impit.Response, attempt: int, method: str = 'GET') -> None:
2123
"""Initialize a new instance.
2224
2325
Args:
2426
response: The response to the failed API call.
2527
attempt: Which attempt was the request that failed.
28+
method: The HTTP method used for the request.
2629
"""
2730
self.message: str | None = None
2831
self.type: str | None = None
2932
self.data = dict[str, str]()
3033

3134
self.message = f'Unexpected error: {response.text}'
3235
try:
33-
response_data = response.json()
36+
response_data = jsonlib.loads(response.text)
3437
if 'error' in response_data:
3538
self.message = response_data['error']['message']
3639
self.type = response_data['error']['type']
@@ -44,7 +47,7 @@ def __init__(self, response: httpx.Response, attempt: int) -> None:
4447
self.name = 'ApifyApiError'
4548
self.status_code = response.status_code
4649
self.attempt = attempt
47-
self.http_method = response.request.method
50+
self.http_method = method
4851

4952
# TODO: self.client_method # noqa: TD003
5053
# TODO: self.original_stack # noqa: TD003
@@ -61,7 +64,7 @@ class InvalidResponseBodyError(ApifyClientError):
6164
"""
6265

6366
@ignore_docs
64-
def __init__(self, response: httpx.Response) -> None:
67+
def __init__(self, response: impit.Response) -> None:
6568
"""Initialize a new instance.
6669
6770
Args:
@@ -80,8 +83,8 @@ def is_retryable_error(exc: Exception) -> bool:
8083
exc,
8184
(
8285
InvalidResponseBodyError,
83-
httpx.NetworkError,
84-
httpx.TimeoutException,
85-
httpx.RemoteProtocolError,
86+
impit.NetworkError,
87+
impit.TimeoutException,
88+
impit.RemoteProtocolError,
8689
),
8790
)

src/apify_client/_http_client.py

Lines changed: 49 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
from http import HTTPStatus
99
from importlib import metadata
1010
from typing import TYPE_CHECKING, Any
11+
from urllib.parse import urlencode
1112

12-
import httpx
13+
import impit
1314
from apify_shared.utils import ignore_docs, is_content_type_json, is_content_type_text, is_content_type_xml
1415

1516
from apify_client._errors import ApifyApiError, InvalidResponseBodyError, is_retryable_error
@@ -59,13 +60,13 @@ def __init__(
5960
if token is not None:
6061
headers['Authorization'] = f'Bearer {token}'
6162

62-
self.httpx_client = httpx.Client(headers=headers, follow_redirects=True, timeout=timeout_secs)
63-
self.httpx_async_client = httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=timeout_secs)
63+
self.impit_client = impit.Client(headers=headers, follow_redirects=True, timeout=timeout_secs)
64+
self.impit_async_client = impit.AsyncClient(headers=headers, follow_redirects=True, timeout=timeout_secs)
6465

6566
self.stats = stats or Statistics()
6667

6768
@staticmethod
68-
def _maybe_parse_response(response: httpx.Response) -> Any:
69+
def _maybe_parse_response(response: impit.Response) -> Any:
6970
if response.status_code == HTTPStatus.NO_CONTENT:
7071
return None
7172

@@ -75,7 +76,7 @@ def _maybe_parse_response(response: httpx.Response) -> Any:
7576

7677
try:
7778
if is_content_type_json(content_type):
78-
return response.json()
79+
return jsonlib.loads(response.text)
7980
elif is_content_type_xml(content_type) or is_content_type_text(content_type): # noqa: RET505
8081
return response.text
8182
else:
@@ -131,6 +132,21 @@ def _prepare_request_call(
131132
data,
132133
)
133134

135+
def _build_url_with_params(self, url: str, params: dict | None = None) -> str:
136+
if not params:
137+
return url
138+
139+
param_pairs: list[tuple[str, str]] = []
140+
for key, value in params.items():
141+
if isinstance(value, list):
142+
param_pairs.extend((key, str(v)) for v in value)
143+
else:
144+
param_pairs.append((key, str(value)))
145+
146+
query_string = urlencode(param_pairs)
147+
148+
return f'{url}?{query_string}'
149+
134150

135151
class HTTPClient(_BaseHTTPClient):
136152
def call(
@@ -145,7 +161,7 @@ def call(
145161
stream: bool | None = None,
146162
parse_response: bool | None = True,
147163
timeout_secs: int | None = None,
148-
) -> httpx.Response:
164+
) -> impit.Response:
149165
log_context.method.set(method)
150166
log_context.url.set(url)
151167

@@ -156,41 +172,34 @@ def call(
156172

157173
headers, params, content = self._prepare_request_call(headers, params, data, json)
158174

159-
httpx_client = self.httpx_client
175+
impit_client = self.impit_client
160176

161-
def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
177+
def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
162178
log_context.attempt.set(attempt)
163179
logger.debug('Sending request')
164180

165181
self.stats.requests += 1
166182

167183
try:
168-
request = httpx_client.build_request(
184+
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
185+
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
186+
187+
url_with_params = self._build_url_with_params(url, params)
188+
189+
response = impit_client.request(
169190
method=method,
170-
url=url,
191+
url=url_with_params,
171192
headers=headers,
172-
params=params,
173193
content=content,
174-
)
175-
176-
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
177-
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
178-
request.extensions['timeout'] = {
179-
'connect': timeout,
180-
'pool': timeout,
181-
'read': timeout,
182-
'write': timeout,
183-
}
184-
185-
response = httpx_client.send(
186-
request=request,
194+
timeout=timeout,
187195
stream=stream or False,
188196
)
189197

190198
# If response status is < 300, the request was successful, and we can return the result
191199
if response.status_code < 300: # noqa: PLR2004
192200
logger.debug('Request successful', extra={'status_code': response.status_code})
193-
if not stream:
201+
# TODODO Impit does not support setting custom attributes on the response object,
202+
if not stream and response.content == b'A unique condition for checking types. ABRACADABRA':
194203
_maybe_parsed_body = (
195204
self._maybe_parse_response(response) if parse_response else response.content
196205
)
@@ -214,7 +223,7 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
214223
if response.status_code < 500 and response.status_code != HTTPStatus.TOO_MANY_REQUESTS: # noqa: PLR2004
215224
logger.debug('Status code is not retryable', extra={'status_code': response.status_code})
216225
stop_retrying()
217-
raise ApifyApiError(response, attempt)
226+
raise ApifyApiError(response, attempt, method=method)
218227

219228
return retry_with_exp_backoff(
220229
_make_request,
@@ -238,7 +247,7 @@ async def call(
238247
stream: bool | None = None,
239248
parse_response: bool | None = True,
240249
timeout_secs: int | None = None,
241-
) -> httpx.Response:
250+
) -> impit.Response:
242251
log_context.method.set(method)
243252
log_context.url.set(url)
244253

@@ -249,38 +258,31 @@ async def call(
249258

250259
headers, params, content = self._prepare_request_call(headers, params, data, json)
251260

252-
httpx_async_client = self.httpx_async_client
261+
impit_async_client = self.impit_async_client
253262

254-
async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
263+
async def _make_request(stop_retrying: Callable, attempt: int) -> impit.Response:
255264
log_context.attempt.set(attempt)
256265
logger.debug('Sending request')
257266
try:
258-
request = httpx_async_client.build_request(
267+
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
268+
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
269+
270+
url_with_params = self._build_url_with_params(url, params)
271+
272+
response = await impit_async_client.request(
259273
method=method,
260-
url=url,
274+
url=url_with_params,
261275
headers=headers,
262-
params=params,
263276
content=content,
264-
)
265-
266-
# Increase timeout with each attempt. Max timeout is bounded by the client timeout.
267-
timeout = min(self.timeout_secs, (timeout_secs or self.timeout_secs) * 2 ** (attempt - 1))
268-
request.extensions['timeout'] = {
269-
'connect': timeout,
270-
'pool': timeout,
271-
'read': timeout,
272-
'write': timeout,
273-
}
274-
275-
response = await httpx_async_client.send(
276-
request=request,
277+
timeout=timeout,
277278
stream=stream or False,
278279
)
279280

280281
# If response status is < 300, the request was successful, and we can return the result
281282
if response.status_code < 300: # noqa: PLR2004
282283
logger.debug('Request successful', extra={'status_code': response.status_code})
283-
if not stream:
284+
# TODODO Impit does not support setting custom attributes on the response object,
285+
if not stream and response.content == b'A unique condition for checking types. ABRACADABRA':
284286
_maybe_parsed_body = (
285287
self._maybe_parse_response(response) if parse_response else response.content
286288
)
@@ -304,7 +306,7 @@ async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response
304306
if response.status_code < 500 and response.status_code != HTTPStatus.TOO_MANY_REQUESTS: # noqa: PLR2004
305307
logger.debug('Status code is not retryable', extra={'status_code': response.status_code})
306308
stop_retrying()
307-
raise ApifyApiError(response, attempt)
309+
raise ApifyApiError(response, attempt, method=method)
308310

309311
return await retry_with_exp_backoff_async(
310312
_make_request,

src/apify_client/clients/base/actor_job_base_client.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import json as jsonlib
45
import math
56
import time
67
from datetime import datetime, timezone
@@ -39,7 +40,7 @@ def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
3940
method='GET',
4041
params=self._params(waitForFinish=wait_for_finish),
4142
)
42-
job = parse_date_fields(pluck_data(response.json()))
43+
job = parse_date_fields(pluck_data(jsonlib.loads(response.text)))
4344

4445
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
4546
if ActorJobStatus(job['status']).is_terminal or (
@@ -70,7 +71,7 @@ def _abort(self, *, gracefully: bool | None = None) -> dict:
7071
method='POST',
7172
params=self._params(gracefully=gracefully),
7273
)
73-
return parse_date_fields(pluck_data(response.json()))
74+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
7475

7576

7677
@ignore_docs
@@ -94,7 +95,7 @@ async def _wait_for_finish(self, wait_secs: int | None = None) -> dict | None:
9495
method='GET',
9596
params=self._params(waitForFinish=wait_for_finish),
9697
)
97-
job = parse_date_fields(pluck_data(response.json()))
98+
job = parse_date_fields(pluck_data(jsonlib.loads(response.text)))
9899

99100
seconds_elapsed = math.floor((datetime.now(timezone.utc) - started_at).total_seconds())
100101
if ActorJobStatus(job['status']).is_terminal or (
@@ -125,4 +126,4 @@ async def _abort(self, *, gracefully: bool | None = None) -> dict:
125126
method='POST',
126127
params=self._params(gracefully=gracefully),
127128
)
128-
return parse_date_fields(pluck_data(response.json()))
129+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))

src/apify_client/clients/base/resource_client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations
22

3+
import json as jsonlib
4+
35
from apify_shared.utils import ignore_docs, parse_date_fields
46

57
from apify_client._errors import ApifyApiError
@@ -20,7 +22,7 @@ def _get(self, timeout_secs: int | None = None) -> dict | None:
2022
timeout_secs=timeout_secs,
2123
)
2224

23-
return parse_date_fields(pluck_data(response.json()))
25+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
2426

2527
except ApifyApiError as exc:
2628
catch_not_found_or_throw(exc)
@@ -36,7 +38,7 @@ def _update(self, updated_fields: dict, timeout_secs: int | None = None) -> dict
3638
timeout_secs=timeout_secs,
3739
)
3840

39-
return parse_date_fields(pluck_data(response.json()))
41+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
4042

4143
def _delete(self, timeout_secs: int | None = None) -> None:
4244
try:
@@ -64,7 +66,7 @@ async def _get(self, timeout_secs: int | None = None) -> dict | None:
6466
timeout_secs=timeout_secs,
6567
)
6668

67-
return parse_date_fields(pluck_data(response.json()))
69+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
6870

6971
except ApifyApiError as exc:
7072
catch_not_found_or_throw(exc)
@@ -80,7 +82,7 @@ async def _update(self, updated_fields: dict, timeout_secs: int | None = None) -
8082
timeout_secs=timeout_secs,
8183
)
8284

83-
return parse_date_fields(pluck_data(response.json()))
85+
return parse_date_fields(pluck_data(jsonlib.loads(response.text)))
8486

8587
async def _delete(self, timeout_secs: int | None = None) -> None:
8688
try:

0 commit comments

Comments
 (0)