-
-
Notifications
You must be signed in to change notification settings - Fork 976
Feature: optional pycurl #2269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: optional pycurl #2269
Changes from all commits
c4cce86
7764c89
d547219
5f79c1b
fcd46de
2f0359a
aabaa64
5b40be2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,294 @@ | ||
| """HTTP Client using pyCurl.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections import deque | ||
| from functools import partial | ||
| from io import BytesIO | ||
| from time import time | ||
|
|
||
| from kombu.asynchronous.hub import READ, WRITE, Hub, get_event_loop | ||
| from kombu.exceptions import HttpError | ||
| from kombu.utils.encoding import bytes_to_str | ||
|
|
||
| from .base import BaseClient | ||
|
|
||
| try: | ||
| import pycurl | ||
| except ImportError: # pragma: no cover | ||
| pycurl = Curl = METH_TO_CURL = None | ||
| else: | ||
| from pycurl import Curl | ||
|
|
||
| METH_TO_CURL = { | ||
| 'GET': pycurl.HTTPGET, | ||
| 'POST': pycurl.POST, | ||
| 'PUT': pycurl.UPLOAD, | ||
| 'HEAD': pycurl.NOBODY, | ||
| } | ||
|
|
||
| __all__ = ('CurlClient',) | ||
|
|
||
| DEFAULT_USER_AGENT = 'Mozilla/5.0 (compatible; pycurl)' | ||
| EXTRA_METHODS = frozenset(['DELETE', 'OPTIONS', 'PATCH']) | ||
|
|
||
|
|
||
| class CurlClient(BaseClient): | ||
| """Curl HTTP Client.""" | ||
|
|
||
| Curl = Curl | ||
|
|
||
| def __init__(self, hub: Hub | None = None, max_clients: int = 10): | ||
| if pycurl is None: | ||
| raise ImportError('The curl client requires the pycurl library.') | ||
| hub = hub or get_event_loop() | ||
| super().__init__(hub) | ||
| self.max_clients = max_clients | ||
|
|
||
| self._multi = pycurl.CurlMulti() | ||
| self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) | ||
| self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket) | ||
| self._curls = [self.Curl() for i in range(max_clients)] | ||
| self._free_list = self._curls[:] | ||
| self._pending = deque() | ||
| self._fds = {} | ||
|
|
||
| self._socket_action = self._multi.socket_action | ||
| self._timeout_check_tref = self.hub.call_repeatedly( | ||
| 1.0, self._timeout_check, | ||
| ) | ||
|
|
||
| # pycurl 7.29.0 workaround | ||
| dummy_curl_handle = pycurl.Curl() | ||
| self._multi.add_handle(dummy_curl_handle) | ||
| self._multi.remove_handle(dummy_curl_handle) | ||
|
|
||
| def close(self): | ||
| self._timeout_check_tref.cancel() | ||
| for _curl in self._curls: | ||
| _curl.close() | ||
| self._multi.close() | ||
|
|
||
| def add_request(self, request): | ||
| self._pending.append(request) | ||
| self._process_queue() | ||
| self._set_timeout(0) | ||
| return request | ||
|
|
||
| # the next two methods are used for linux/epoll workaround: | ||
| # we temporarily remove all curl fds from hub, so curl cannot | ||
| # close a fd which is still inside epoll | ||
| def _pop_from_hub(self): | ||
| for fd in self._fds: | ||
| self.hub.remove(fd) | ||
|
|
||
| def _push_to_hub(self): | ||
| for fd, events in self._fds.items(): | ||
| if events & READ: | ||
| self.hub.add_reader(fd, self.on_readable, fd) | ||
| if events & WRITE: | ||
| self.hub.add_writer(fd, self.on_writable, fd) | ||
|
|
||
| def _handle_socket(self, event, fd, multi, data, _pycurl=pycurl): | ||
| if event == _pycurl.POLL_REMOVE: | ||
| if fd in self._fds: | ||
| self._fds.pop(fd, None) | ||
| else: | ||
| if event == _pycurl.POLL_IN: | ||
| self._fds[fd] = READ | ||
| elif event == _pycurl.POLL_OUT: | ||
| self._fds[fd] = WRITE | ||
| elif event == _pycurl.POLL_INOUT: | ||
| self._fds[fd] = READ | WRITE | ||
|
|
||
| def _set_timeout(self, msecs): | ||
| self.hub.call_later(msecs, self._timeout_check) | ||
|
|
||
| def _timeout_check(self, _pycurl=pycurl): | ||
| self._pop_from_hub() | ||
| try: | ||
| while 1: | ||
| try: | ||
| ret, _ = self._multi.socket_all() | ||
| except pycurl.error as exc: | ||
| ret = exc.args[0] | ||
| if ret != _pycurl.E_CALL_MULTI_PERFORM: | ||
| break | ||
| finally: | ||
| self._push_to_hub() | ||
| self._process_pending_requests() | ||
|
|
||
| def on_readable(self, fd, _pycurl=pycurl): | ||
| return self._on_event(fd, _pycurl.CSELECT_IN) | ||
|
|
||
| def on_writable(self, fd, _pycurl=pycurl): | ||
| return self._on_event(fd, _pycurl.CSELECT_OUT) | ||
|
|
||
| def _on_event(self, fd, event, _pycurl=pycurl): | ||
| self._pop_from_hub() | ||
| try: | ||
| while 1: | ||
| try: | ||
| ret, _ = self._socket_action(fd, event) | ||
| except pycurl.error as exc: | ||
| ret = exc.args[0] | ||
| if ret != _pycurl.E_CALL_MULTI_PERFORM: | ||
| break | ||
| finally: | ||
| self._push_to_hub() | ||
| self._process_pending_requests() | ||
|
|
||
| def _process_pending_requests(self): | ||
| while 1: | ||
| q, succeeded, failed = self._multi.info_read() | ||
| for curl in succeeded: | ||
| self._process(curl) | ||
| for curl, errno, reason in failed: | ||
| self._process(curl, errno, reason) | ||
| if q == 0: | ||
| break | ||
| self._process_queue() | ||
|
|
||
| def _process_queue(self): | ||
| while 1: | ||
| started = 0 | ||
| while self._free_list and self._pending: | ||
| started += 1 | ||
| curl = self._free_list.pop() | ||
| request = self._pending.popleft() | ||
| headers = self.Headers() | ||
| buf = BytesIO() | ||
| curl.info = { | ||
| 'headers': headers, | ||
| 'buffer': buf, | ||
| 'request': request, | ||
| 'curl_start_time': time(), | ||
| } | ||
| self._setup_request(curl, request, buf, headers) | ||
| self._multi.add_handle(curl) | ||
| if not started: | ||
| break | ||
|
|
||
| def _process(self, curl, errno=None, reason=None, _pycurl=pycurl): | ||
| info, curl.info = curl.info, None | ||
| self._multi.remove_handle(curl) | ||
| self._free_list.append(curl) | ||
| buffer = info['buffer'] | ||
| if errno: | ||
| code = 599 | ||
| error = HttpError(code, reason) | ||
| error.errno = errno | ||
| effective_url = None | ||
| buffer.close() | ||
| buffer = None | ||
| else: | ||
| error = None | ||
| code = curl.getinfo(_pycurl.HTTP_CODE) | ||
| effective_url = curl.getinfo(_pycurl.EFFECTIVE_URL) | ||
| buffer.seek(0) | ||
| # try: | ||
| request = info['request'] | ||
| request.on_ready(self.Response( | ||
| request=request, code=code, headers=info['headers'], | ||
| buffer=buffer, effective_url=effective_url, error=error, | ||
| )) | ||
|
|
||
| def _setup_request(self, curl, request, buffer, headers, _pycurl=pycurl): | ||
| setopt = curl.setopt | ||
| setopt(_pycurl.URL, bytes_to_str(request.url)) | ||
|
|
||
| # see tornado curl client | ||
| request.headers.setdefault('Expect', '') | ||
| request.headers.setdefault('Pragma', '') | ||
|
|
||
| setopt( | ||
| _pycurl.HTTPHEADER, | ||
| ['{}: {}'.format(*h) for h in request.headers.items()], | ||
| ) | ||
|
|
||
| setopt( | ||
| _pycurl.HEADERFUNCTION, | ||
| partial(request.on_header or self.on_header, request.headers), | ||
| ) | ||
| setopt( | ||
| _pycurl.WRITEFUNCTION, request.on_stream or buffer.write, | ||
| ) | ||
| setopt( | ||
| _pycurl.FOLLOWLOCATION, request.follow_redirects, | ||
| ) | ||
| setopt( | ||
| _pycurl.USERAGENT, | ||
| bytes_to_str(request.user_agent or DEFAULT_USER_AGENT), | ||
| ) | ||
| if request.network_interface: | ||
| setopt(_pycurl.INTERFACE, request.network_interface) | ||
| setopt( | ||
| _pycurl.ENCODING, 'gzip,deflate' if request.use_gzip else 'none', | ||
| ) | ||
| if request.proxy_host: | ||
| if not request.proxy_port: | ||
| raise ValueError('Request with proxy_host but no proxy_port') | ||
| setopt(_pycurl.PROXY, request.proxy_host) | ||
| setopt(_pycurl.PROXYPORT, request.proxy_port) | ||
| if request.proxy_username: | ||
| setopt(_pycurl.PROXYUSERPWD, '{}:{}'.format( | ||
| request.proxy_username, request.proxy_password or '')) | ||
|
|
||
| setopt(_pycurl.SSL_VERIFYPEER, 1 if request.validate_cert else 0) | ||
| setopt(_pycurl.SSL_VERIFYHOST, 2 if request.validate_cert else 0) | ||
| if request.ca_certs is not None: | ||
| setopt(_pycurl.CAINFO, request.ca_certs) | ||
|
|
||
| setopt(_pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER) | ||
|
|
||
| for meth in METH_TO_CURL.values(): | ||
| setopt(meth, False) | ||
| try: | ||
| meth = METH_TO_CURL[request.method] | ||
| except KeyError: | ||
| curl.setopt(_pycurl.CUSTOMREQUEST, request.method) | ||
| else: | ||
| curl.unsetopt(_pycurl.CUSTOMREQUEST) | ||
| setopt(meth, True) | ||
|
|
||
| if request.method in ('POST', 'PUT'): | ||
| if not request.body: | ||
| body = b'' | ||
| else: | ||
| body = request.body if isinstance(request.body, bytes) else request.body.encode('utf-8') | ||
|
|
||
| reqbuffer = BytesIO(body) | ||
| setopt(_pycurl.READFUNCTION, reqbuffer.read) | ||
| if request.method == 'POST': | ||
|
|
||
| def ioctl(cmd): | ||
| if cmd == _pycurl.IOCMD_RESTARTREAD: | ||
| reqbuffer.seek(0) | ||
|
|
||
| setopt(_pycurl.IOCTLFUNCTION, ioctl) | ||
| setopt(_pycurl.POSTFIELDSIZE, len(body)) | ||
| else: | ||
| setopt(_pycurl.INFILESIZE, len(body)) | ||
| elif request.method == 'GET': | ||
| assert not request.body | ||
|
|
||
| if request.auth_username is not None: | ||
| auth_mode = { | ||
| 'basic': _pycurl.HTTPAUTH_BASIC, | ||
| 'digest': _pycurl.HTTPAUTH_DIGEST | ||
| }[request.auth_mode or 'basic'] | ||
| setopt(_pycurl.HTTPAUTH, auth_mode) | ||
| userpwd = '{}:{}'.format( | ||
| request.auth_username, request.auth_password or '', | ||
| ) | ||
| setopt(_pycurl.USERPWD, userpwd) | ||
| else: | ||
| curl.unsetopt(_pycurl.USERPWD) | ||
|
|
||
| if request.client_cert is not None: | ||
| setopt(_pycurl.SSLCERT, request.client_cert) | ||
| if request.client_key is not None: | ||
| setopt(_pycurl.SSLKEY, request.client_key) | ||
|
|
||
| if request.on_prepare is not None: | ||
| request.on_prepare(curl) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,3 +7,4 @@ bumpversion==0.6.0 | |
| pydocstyle==6.3.0 | ||
| mypy==1.14.1 | ||
| typing_extensions==4.12.2; python_version<"3.10" | ||
| types-pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is types-pycurl? did the package name changed?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was suggested by mypy. Otherwise mypy ci was failing
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Weren't we using only pycurl?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I started to get this error only now, and types-pycurl was suggested. The error was related to mypy build (which I am not very familiar with). Then I saw other types-* are already used from the same repo. And then if pycurl is optional then full pycurl dependency is not required and types-pycurl would be sufficient to pass mypy build. I suppose mypy build does typechecking that is why it errored |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this work as expected? we are not using any try except block here, I'm open to better suggestions as well
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i've looked at try-catch, but liner was complaining. and.. the
.curlcode already has this ImportError exception handling. so until theCurlClientis instantiated - there will be no exception. however theCurlClient.Curlwill beNone.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ima gonna test the code now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code seems to work without
pycurland no thrownExceptionsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok