Skip to content

Commit e47d1ef

Browse files
committed
removed old, urllib2-based transport class, and renamed urllib3 transport (#107)
closes #107
1 parent 11e5ab3 commit e47d1ef

File tree

16 files changed

+157
-264
lines changed

16 files changed

+157
-264
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ https://github.com/elastic/apm-agent-python/compare/v1.0.0.dev3\...master[Check
1616
* changed name that the agent reports itself with to the APM server from `elasticapm-python` to `python`. This aligns the Python agent with other languages. ({pull}104[#104])
1717
* changed Celery integration to store the task state (e.g. `SUCCESS` or `FAILURE`) in `transaction.result` ({pull}100[#100])
1818
* BREAKING: renamed `server` configuration variable to `server_url` to better align with other language agents ({pull}105[#105])
19+
* BREAKING: removed the old and unused urllib2-based HTTP transport, and renamed the urllib3 transport ({pull}107[#107])
1920

2021
[[release-v1.0.0.dev3]]
2122
[float]

docs/configuration.asciidoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,14 @@ The URL must be fully qualified, including protocol (`http` or `https`) and port
7777

7878
|============
7979
| Environment | Django/Flask | Default
80-
| `ELASTIC_APM_TRANSPORT_CLASS` | `TRANSPORT_CLASS` | `elasticapm.transport.http_urllib3.AsyncUrllib3Transport`
80+
| `ELASTIC_APM_TRANSPORT_CLASS` | `TRANSPORT_CLASS` | `elasticapm.transport.http.AsyncTransport`
8181
|============
8282

8383

8484
The transport class to use when sending events to the APM server.
85-
The default `AsyncUrllib3Transport` uses a background thread to send data.
85+
The default `AsyncTransport` uses a background thread to send data.
8686
If your environment doesn't allow background threads, you can use
87-
`elasticapm.transport.http_urllib3.Urllib3Transport` instead.
87+
`elasticapm.transport.http.Transport` instead.
8888
Note however that this can have adverse effects on performance.
8989

9090

elasticapm/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,12 @@ def _get_log_message(self, data):
300300
return message
301301

302302
def _get_transport(self, parsed_url):
303-
if self._transport_class.async_mode and is_master_process():
303+
if hasattr(self._transport_class, 'sync_transport') and is_master_process():
304304
# when in the master process, always use SYNC mode. This avoids
305305
# the danger of being forked into an inconsistent threading state
306306
self.logger.info('Sending message synchronously while in master '
307307
'process. PID: %s', os.getpid())
308-
return import_string(defaults.SYNC_TRANSPORT_CLASS)(parsed_url)
308+
return self._transport_class.sync_transport(parsed_url)
309309
if parsed_url not in self._transports:
310310
self._transports[parsed_url] = self._transport_class(parsed_url)
311311
return self._transports[parsed_url]

elasticapm/conf/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class Config(_ConfigBase):
143143
hostname = _ConfigValue('HOSTNAME', default=socket.gethostname())
144144
auto_log_stacks = _BoolConfigValue('AUTO_LOG_STACKS', default=True)
145145
keyword_max_length = _ConfigValue('KEYWORD_MAX_LENGTH', type=int, default=1024)
146-
transport_class = _ConfigValue('TRANSPORT_CLASS', default='elasticapm.transport.http_urllib3.AsyncUrllib3Transport',
146+
transport_class = _ConfigValue('TRANSPORT_CLASS', default='elasticapm.transport.http.AsyncTransport',
147147
required=True)
148148
processors = _ListConfigValue('PROCESSORS', default=[
149149
'elasticapm.processors.sanitize_stacktrace_locals',

elasticapm/conf/defaults.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,4 @@
7979
# Should elasticapm wrap middleware for better metrics detection
8080
INSTRUMENT_DJANGO_MIDDLEWARE = True
8181

82-
SYNC_TRANSPORT_CLASS = 'elasticapm.transport.http_urllib3.Urllib3Transport'
83-
84-
ASYNC_TRANSPORT_CLASS = 'elasticapm.transport.http_urllib3.AsyncUrllib3Transport'
85-
8682
TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'

elasticapm/transport/__init__.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1 @@
11
# -*- coding: utf-8 -*-
2-
3-
from elasticapm.transport.http import AsyncHTTPTransport, HTTPTransport
4-
5-
6-
default = [HTTPTransport, AsyncHTTPTransport]

elasticapm/transport/asyncio.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,16 @@
55
from elasticapm.conf import defaults
66

77
from .base import TransportException
8-
from .http import HTTPTransport
8+
from .http_base import HTTPTransportBase
99

1010

11-
class AsyncioHTTPTransport(HTTPTransport):
11+
class AsyncioHTTPTransport(HTTPTransportBase):
1212
"""
1313
HTTP Transport ready for asyncio
1414
"""
1515

1616
def __init__(self, parsed_url):
17-
self.check_scheme(parsed_url)
18-
19-
self._parsed_url = parsed_url
20-
self._url = parsed_url.geturl()
17+
super(AsyncioHTTPTransport, self).__init__(parsed_url)
2118
loop = asyncio.get_event_loop()
2219
self.client = aiohttp.ClientSession(loop=loop)
2320

elasticapm/transport/http.py

Lines changed: 57 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +1,86 @@
11
# -*- coding: utf-8 -*-
22
import logging
3-
import socket
3+
import os
44

5-
from elasticapm.conf import defaults
6-
from elasticapm.contrib.async_worker import AsyncWorker
7-
from elasticapm.transport.base import (AsyncTransport, Transport,
8-
TransportException)
9-
from elasticapm.utils.compat import HTTPError
10-
11-
try:
12-
from urllib2 import Request, urlopen
13-
except ImportError:
14-
from urllib.request import Request, urlopen
5+
import certifi
6+
import urllib3
7+
from urllib3.exceptions import MaxRetryError, TimeoutError
158

9+
from elasticapm.conf import defaults
10+
from elasticapm.transport.base import TransportException
11+
from elasticapm.transport.http_base import (AsyncHTTPTransportBase,
12+
HTTPTransportBase)
13+
from elasticapm.utils import compat
1614

17-
logger = logging.getLogger('elasticapm')
15+
logger = logging.getLogger(__name__)
1816

1917

20-
class HTTPTransport(Transport):
18+
class Transport(HTTPTransportBase):
2119

2220
scheme = ['http', 'https']
2321

2422
def __init__(self, parsed_url):
25-
self.check_scheme(parsed_url)
26-
27-
self._parsed_url = parsed_url
28-
self._url = parsed_url.geturl()
23+
kwargs = {
24+
'cert_reqs': 'CERT_REQUIRED',
25+
'ca_certs': certifi.where(),
26+
'block': True,
27+
}
28+
proxy_url = os.environ.get('HTTPS_PROXY', os.environ.get('HTTP_PROXY'))
29+
if proxy_url:
30+
self.http = urllib3.ProxyManager(proxy_url, **kwargs)
31+
else:
32+
self.http = urllib3.PoolManager(**kwargs)
33+
super(Transport, self).__init__(parsed_url)
2934

3035
def send(self, data, headers, timeout=None):
31-
"""
32-
Sends a request to a remote webserver using HTTP POST.
33-
34-
Returns the shortcut URL of the recorded error on Elastic APM
35-
"""
36-
req = Request(self._url, headers=headers)
3736
if timeout is None:
3837
timeout = defaults.TIMEOUT
3938
response = None
39+
40+
# ensure headers are byte strings
41+
headers = {k.encode('ascii') if isinstance(k, compat.text_type) else k:
42+
v.encode('ascii') if isinstance(v, compat.text_type) else v
43+
for k, v in headers.items()}
44+
if compat.PY2 and isinstance(self._url, compat.text_type):
45+
url = self._url.encode('utf-8')
46+
else:
47+
url = self._url
4048
try:
4149
try:
42-
response = urlopen(req, data, timeout)
43-
except TypeError:
44-
response = urlopen(req, data)
45-
except Exception as e:
46-
print_trace = True
47-
if isinstance(e, socket.timeout):
48-
message = (
49-
"Connection to APM Server timed out "
50-
"(url: %s, timeout: %d seconds)" % (self._url, timeout)
50+
response = self.http.urlopen(
51+
'POST', url, body=data, headers=headers, timeout=timeout, preload_content=False
5152
)
52-
elif isinstance(e, HTTPError):
53-
body = e.read()
54-
if e.code == 429: # rate-limited
53+
logger.info('Sent request, url=%s size=%.2fkb status=%s', url, len(data) / 1024.0, response.status)
54+
except Exception as e:
55+
print_trace = True
56+
if isinstance(e, MaxRetryError) and isinstance(e.reason, TimeoutError):
57+
message = (
58+
"Connection to APM Server timed out "
59+
"(url: %s, timeout: %d seconds)" % (self._url, timeout)
60+
)
61+
print_trace = False
62+
else:
63+
message = 'Unable to reach APM Server: %s (url: %s)' % (
64+
e, self._url
65+
)
66+
raise TransportException(message, data, print_trace=print_trace)
67+
body = response.read()
68+
if response.status >= 400:
69+
if response.status == 429: # rate-limited
5570
message = 'Temporarily rate limited: '
5671
print_trace = False
5772
else:
58-
message = 'Unable to reach APM Server: '
59-
message += '%s (url: %s, body: %s)' % (e, self._url, body)
60-
else:
61-
message = 'Unable to reach APM Server: %s (url: %s)' % (
62-
e, self._url
63-
)
64-
raise TransportException(message, data, print_trace=print_trace)
73+
message = 'HTTP %s: ' % response.status
74+
print_trace = True
75+
message += body.decode('utf8')
76+
raise TransportException(message, data, print_trace=print_trace)
77+
return response.getheader('Location')
6578
finally:
6679
if response:
6780
response.close()
6881

69-
return response.info().get('Location')
7082

71-
72-
class AsyncHTTPTransport(AsyncTransport, HTTPTransport):
83+
class AsyncTransport(AsyncHTTPTransportBase, Transport):
7384
scheme = ['http', 'https']
7485
async_mode = True
75-
76-
def __init__(self, parsed_url):
77-
super(AsyncHTTPTransport, self).__init__(parsed_url)
78-
if self._url.startswith('async+'):
79-
self._url = self._url[6:]
80-
self._worker = None
81-
82-
@property
83-
def worker(self):
84-
if not self._worker or not self._worker.is_alive():
85-
self._worker = AsyncWorker()
86-
return self._worker
87-
88-
def send_sync(self, data=None, headers=None, success_callback=None,
89-
fail_callback=None):
90-
try:
91-
url = HTTPTransport.send(self, data, headers)
92-
if callable(success_callback):
93-
success_callback(url=url)
94-
except Exception as e:
95-
if callable(fail_callback):
96-
fail_callback(exception=e)
97-
98-
def send_async(self, data, headers, success_callback=None,
99-
fail_callback=None):
100-
kwargs = {
101-
'data': data,
102-
'headers': headers,
103-
'success_callback': success_callback,
104-
'fail_callback': fail_callback,
105-
}
106-
self.worker.queue(self.send_sync, kwargs)
107-
108-
def close(self):
109-
if self._worker:
110-
self._worker.main_thread_terminated()
86+
sync_transport = Transport

elasticapm/transport/http_base.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# -*- coding: utf-8 -*-
2+
from elasticapm.contrib.async_worker import AsyncWorker
3+
from elasticapm.transport.base import AsyncTransport, Transport
4+
5+
6+
class HTTPTransportBase(Transport):
7+
scheme = ['http', 'https']
8+
9+
def __init__(self, parsed_url):
10+
self.check_scheme(parsed_url)
11+
12+
self._parsed_url = parsed_url
13+
self._url = parsed_url.geturl()
14+
15+
def send(self, data, headers, timeout=None):
16+
"""
17+
Sends a request to a remote webserver using HTTP POST.
18+
19+
Returns the shortcut URL of the recorded error on Elastic APM
20+
"""
21+
raise NotImplementedError()
22+
23+
24+
class AsyncHTTPTransportBase(AsyncTransport, HTTPTransportBase):
25+
scheme = ['http', 'https']
26+
async_mode = True
27+
sync_transport = HTTPTransportBase
28+
29+
def __init__(self, parsed_url):
30+
super(AsyncHTTPTransportBase, self).__init__(parsed_url)
31+
if self._url.startswith('async+'):
32+
self._url = self._url[6:]
33+
self._worker = None
34+
35+
@property
36+
def worker(self):
37+
if not self._worker or not self._worker.is_alive():
38+
self._worker = AsyncWorker()
39+
return self._worker
40+
41+
def send_sync(self, data=None, headers=None, success_callback=None,
42+
fail_callback=None):
43+
try:
44+
url = self.sync_transport.send(self, data, headers)
45+
if callable(success_callback):
46+
success_callback(url=url)
47+
except Exception as e:
48+
if callable(fail_callback):
49+
fail_callback(exception=e)
50+
51+
def send_async(self, data, headers, success_callback=None,
52+
fail_callback=None):
53+
kwargs = {
54+
'data': data,
55+
'headers': headers,
56+
'success_callback': success_callback,
57+
'fail_callback': fail_callback,
58+
}
59+
self.worker.queue(self.send_sync, kwargs)
60+
61+
def close(self):
62+
if self._worker:
63+
self._worker.main_thread_terminated()

0 commit comments

Comments
 (0)