diff --git a/README.md b/README.md index 347f4c48..afbb7605 100644 --- a/README.md +++ b/README.md @@ -384,15 +384,16 @@ The following atoms are available for use: | identifier | description | | --- | --- | | addr | Client remote address | -| time | Datetime of the request | +| time | Datetime of the request | | dt_ms | Request duration in ms | | status | HTTP response status | | path | Request path (without query string) | | query\_string | Request query string | | method | Request HTTP method | +| {header}i| Request HTTP header| | scheme | Request scheme | | protocol | HTTP protocol version | - +| rbl | HTTP request body length (wsgi only)| ### Workers and threads Granian offers different options to configure the number of workers and threads to be run, in particular: @@ -402,12 +403,12 @@ Granian offers different options to configure the number of workers and threads - **runtime threads**: the number of Rust threads per worker that will perform network I/O - **runtime blocking threads**: the number of Rust threads per worker involved in blocking operations. The main role of these threads is dealing with blocking I/O – like file system operations. -In general, Granian will try its best to automatically pick proper values for the threading configuration, leaving to you the responsibility to choose the number of workers you need. +In general, Granian will try its best to automatically pick proper values for the threading configuration, leaving to you the responsibility to choose the number of workers you need. There is no *golden rule* here, as these numbers will vastly depend both on your application behavior and the deployment target, but we can list some suggestions: - matching the amount of CPU cores for the workers is generally the best starting point; on containerized environments like docker or k8s is best to have 1 worker per container though and scale your containers using the relevant orchestrator; - the default number of **runtime threads** and **runtime blocking threads** is fine for the vast majority of applications out there; you might want to increase the first for applications dealing with several concurrently opened websockets or if you primarily use HTTP/2, and lowering the second only if you serve the same few files to a lot of connections; -In regards of blocking threads, the option is irrelevant on asynchronous protocols, as all the interop will happen with the AsyncIO event loop which will also be the one holding the GIL for the vast majority of the time, and thus the value is fixed to a single thread; on synchronous protocols like WSGI instead, it will be the maximum amount of threads interacting – and thus trying to acquire the GIL – with your application code. All those threads will be spawned on-demand depending on the amount of concurrency, and they'll be shutdown after the amount of inactivity time specified with the relevant setting. +In regards of blocking threads, the option is irrelevant on asynchronous protocols, as all the interop will happen with the AsyncIO event loop which will also be the one holding the GIL for the vast majority of the time, and thus the value is fixed to a single thread; on synchronous protocols like WSGI instead, it will be the maximum amount of threads interacting – and thus trying to acquire the GIL – with your application code. All those threads will be spawned on-demand depending on the amount of concurrency, and they'll be shutdown after the amount of inactivity time specified with the relevant setting. In general, and unless you have a very specific use-case to do so (for example, if your application have an average millisecond response, a very limited amount of blocking threads usually delivers better throughput) you should avoid to tune this threadpool size and configure a backpressure value that suits your needs instead. In that regards, please check the next section. Also, **you should generally avoid to configure workers and threads based on numbers suggested for other servers**, as Granian architecture is quite different from projects like Gunicorn or Uvicorn. @@ -461,7 +462,7 @@ Since version 2.0 Granian supports free-threaded Python. While the installation > **Note:** if for any reason the GIL gets enabled on the free-threaded build, Granian will refuse to start. This means you can't use the free-threaded build on GIL enabled interpreters. -While for asynchronous protocols nothing really changes in terms of workers and threads configuration, as the scaling will be still driven by the number of AsyncIO event loops running (so the same rules for GIL workers apply), on synchronous protocols like WSGI every GIL-related limitation is theoretically absent. +While for asynchronous protocols nothing really changes in terms of workers and threads configuration, as the scaling will be still driven by the number of AsyncIO event loops running (so the same rules for GIL workers apply), on synchronous protocols like WSGI every GIL-related limitation is theoretically absent. While the general rules in terms of I/O-bound vs CPU-bound load still apply, at the time being there's not enough data to make suggestions in terms of workers and threads tuning in the free-threaded Python land, and thus you will need to experiment with those values depending on your specific workload. ## Customising Granian diff --git a/granian/asgi.py b/granian/asgi.py index d235f944..bb0d23d4 100644 --- a/granian/asgi.py +++ b/granian/asgi.py @@ -24,7 +24,11 @@ def __init__(self, callable): async def handle(self): try: await self.callable( - {'type': 'lifespan', 'asgi': {'version': '3.0', 'spec_version': '2.3'}, 'state': self.state}, + { + 'type': 'lifespan', + 'asgi': {'version': '3.0', 'spec_version': '2.3'}, + 'state': self.state, + }, self.receive, self.send, ) @@ -140,16 +144,20 @@ def _build_access_logger(fmt): logger = log_request_builder(fmt) def access_log(t, scope, resp_code): + atoms = { + 'addr_remote': scope['client'][0], + 'protocol': 'HTTP/' + scope['http_version'], + 'path': scope['path'], + 'qs': scope['query_string'], + 'method': scope.get('method', '-'), + 'scheme': scope['scheme'], + 'response_body_length': scope.get('LENGTH', '-'), + } + request_headers = {key.decode('utf-8'): value.decode('utf-8') for key, value in scope['headers']} + atoms.update({'{%s}i' % k: v for k, v in request_headers.items()}) logger( t, - { - 'addr_remote': scope['client'][0], - 'protocol': 'HTTP/' + scope['http_version'], - 'path': scope['path'], - 'qs': scope['query_string'], - 'method': scope.get('method', '-'), - 'scheme': scope['scheme'], - }, + atoms, resp_code, ) diff --git a/granian/log.py b/granian/log.py index 9fb820eb..6cae823c 100644 --- a/granian/log.py +++ b/granian/log.py @@ -7,6 +7,28 @@ from typing import Any, Dict, Optional +class SafeAtoms(dict): + def __init__(self, atoms): + dict.__init__(self) + for key, value in atoms.items(): + if isinstance(value, str): + self[key] = value.replace('"', '\\"') + else: + self[key] = value + + def __getitem__(self, k): + if k.startswith('{'): + kl = k.lower() + if kl in self: + return super().__getitem__(kl) + else: + return '-' + if k in self: + return super().__getitem__(k) + else: + return '-' + + class LogLevels(str, Enum): critical = 'critical' error = 'error' @@ -43,8 +65,16 @@ class LogLevels(str, Enum): }, }, 'handlers': { - 'console': {'formatter': 'generic', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout'}, - 'access': {'formatter': 'access', 'class': 'logging.StreamHandler', 'stream': 'ext://sys.stdout'}, + 'console': { + 'formatter': 'generic', + 'class': 'logging.StreamHandler', + 'stream': 'ext://sys.stdout', + }, + 'access': { + 'formatter': 'access', + 'class': 'logging.StreamHandler', + 'stream': 'ext://sys.stdout', + }, }, 'loggers': { '_granian': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}, @@ -80,19 +110,31 @@ def log_request_builder(fmt): def log_request(rtime, req, res_code): dt = time.time() - rtime rdt = datetime.datetime.fromtimestamp(rtime, tz=local_tz) + atoms_context = { + 'addr': req['addr_remote'], + 'time': rdt.strftime('%Y-%m-%d %H:%M:%S %z'), + 'dt_ms': dt * 1000, + 'status': res_code, + 'path': req['path'], + 'query_string': req['qs'], + 'method': req['method'], + 'scheme': req['scheme'], + 'protocol': req['protocol'], + # request body length + 'rbl': req['response_body_length'], + } + context = SafeAtoms(atoms_context) + del req['addr_remote'] + del req['path'] + del req['qs'] + del req['method'] + del req['scheme'] + del req['protocol'] + del req['response_body_length'] + context.update(req) access_logger.info( fmt, - { - 'addr': req['addr_remote'], - 'time': rdt.strftime('%Y-%m-%d %H:%M:%S %z'), - 'dt_ms': dt * 1000, - 'status': res_code, - 'path': req['path'], - 'query_string': req['qs'], - 'method': req['method'], - 'scheme': req['scheme'], - 'protocol': req['protocol'], - }, + context, ) return log_request diff --git a/granian/wsgi.py b/granian/wsgi.py index f6a06202..f982f561 100644 --- a/granian/wsgi.py +++ b/granian/wsgi.py @@ -1,4 +1,6 @@ +import itertools import os +import re import sys import time from functools import wraps @@ -51,8 +53,15 @@ def _runner(proto, scope): rv = callback(scope, resp) if isinstance(rv, list): + scope['LENGTH'] = len(b''.join(rv)) proto.response_bytes(resp.status, resp.headers, b''.join(rv)) else: + rv, rv_copy = itertools.tee(rv) + size = 0 + for chunk in rv_copy: + size += len(chunk) + scope['LENGTH'] = size + del rv_copy proto.response_iter(resp.status, resp.headers, ResponseIterWrap(rv)) return resp.status @@ -77,16 +86,20 @@ def _build_access_logger(fmt): logger = log_request_builder(fmt) def access_log(t, scope, resp_code): + atoms = { + 'addr_remote': scope['REMOTE_ADDR'].rsplit(':', 1)[0], + 'protocol': scope['SERVER_PROTOCOL'], + 'path': scope['PATH_INFO'], + 'qs': scope['QUERY_STRING'], + 'method': scope['REQUEST_METHOD'], + 'scheme': scope['wsgi.url_scheme'], + 'response_body_length': scope['LENGTH'], + } + request_headers = {key: value for key, value in scope.items() if key.startswith('HTTP_')} + atoms.update({'{%s}i' % re.match(r'HTTP_(.*)', k).group(1).lower(): v for k, v in request_headers.items()}) logger( t, - { - 'addr_remote': scope['REMOTE_ADDR'].rsplit(':', 1)[0], - 'protocol': scope['SERVER_PROTOCOL'], - 'path': scope['PATH_INFO'], - 'qs': scope['QUERY_STRING'], - 'method': scope['REQUEST_METHOD'], - 'scheme': scope['wsgi.url_scheme'], - }, + atoms, resp_code, )