diff --git a/apps/common/consumers.py b/apps/common/consumers.py new file mode 100644 index 0000000..5ac78e1 --- /dev/null +++ b/apps/common/consumers.py @@ -0,0 +1,104 @@ +import asyncio +import logging +import os + +import websockets +from channels.exceptions import DenyConnection +from channels.generic.websocket import AsyncWebsocketConsumer +from django.contrib.auth.models import AnonymousUser + +logger = logging.getLogger(__name__) + + +class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer): + + async def connect(self): + logger.info(f"WebSocket connection attempt: {self.scope['path']}") + + # Authentication check + if isinstance(self.scope["user"], AnonymousUser): + logger.error("Authentication required") + raise DenyConnection("Authentication required") + + if not self.scope["user"].has_perm("common.access_dagster_ui"): + logger.error(f"User {self.scope['user'].username} lacks permission") + raise DenyConnection("Permission denied") + + logger.info(f"User {self.scope['user'].username} authenticated") + + # Build upstream URL + dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000") + dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "") + + path = self.scope["path"] + if path.startswith("/pipelines/"): + path = path[len("/pipelines/") :] + + # Convert http to ws + if dagster_url.startswith("https://"): + ws_url = dagster_url.replace("https://", "wss://", 1) + else: + ws_url = dagster_url.replace("http://", "ws://", 1) + + # Build target URL + if dagster_prefix: + target_url = f"{ws_url}/{dagster_prefix}/{path}" + else: + target_url = f"{ws_url}/{path}" + + # Add query string + if self.scope.get("query_string"): + target_url += f"?{self.scope['query_string'].decode()}" + + logger.info(f"Connecting to upstream: {target_url}") + + # Get subprotocols from client + subprotocols = self.scope.get("subprotocols", []) + + try: + self.websocket = await websockets.connect( + target_url, + max_size=10485760, + ping_interval=20, + subprotocols=subprotocols if subprotocols else None, + ) + logger.info("Connected to upstream") + except Exception as e: + logger.error(f"Failed to connect: {e}") + raise DenyConnection(f"Connection to upstream failed: {e}") + + await self.accept(self.websocket.subprotocol) + logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}") + + self.consumer_task = asyncio.create_task(self.consume_from_upstream()) + + async def disconnect(self, close_code): + logger.info(f"Disconnecting with code {close_code}") + if hasattr(self, "consumer_task"): + self.consumer_task.cancel() + try: + await self.consumer_task + except asyncio.CancelledError: + pass + if hasattr(self, "websocket"): + await self.websocket.close() + + async def receive(self, text_data=None, bytes_data=None): + try: + await self.websocket.send(bytes_data or text_data) + except Exception as e: + logger.error(f"Error forwarding to upstream: {e}") + await self.close() + + async def consume_from_upstream(self): + try: + async for message in self.websocket: + if isinstance(message, bytes): + await self.send(bytes_data=message) + else: + await self.send(text_data=message) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error consuming from upstream: {e}") + await self.close() diff --git a/apps/common/logging_filters.py b/apps/common/logging_filters.py new file mode 100644 index 0000000..66ce154 --- /dev/null +++ b/apps/common/logging_filters.py @@ -0,0 +1,43 @@ +import logging + + +class SuppressWebSocketPings(logging.Filter): + + def filter(self, record): + suppress_phrases = [ + "sending keepalive ping", + "received keepalive pong", + "> PING", + "< PONG", + "% sending keepalive", + "% received keepalive", + "ASGI 'lifespan' protocol appears unsupported.", + ] + + message = record.getMessage() + + for phrase in suppress_phrases: + if phrase in message: + return False # Don't log this message + + return True + + +class SuppressRevProxyNoise(logging.Filter): + + def filter(self, record): + # Suppress these RevProxy messages + suppress_phrases = [ + "ProxyView created", + "Normalizing response headers", + "Checking for invalid cookies", + "Starting streaming HTTP Response", + ] + + message = record.getMessage() + + for phrase in suppress_phrases: + if phrase in message: + return False + + return True diff --git a/apps/common/routing.py b/apps/common/routing.py new file mode 100644 index 0000000..84eee93 --- /dev/null +++ b/apps/common/routing.py @@ -0,0 +1,8 @@ +from django.urls import re_path + +from common.consumers import DagsterWebSocketProxyConsumer + +websocket_urlpatterns = [ + # Route WebSocket connections for Dagster proxy + re_path(r"^pipelines/(?P.*)$", DagsterWebSocketProxyConsumer.as_asgi()), +] diff --git a/docker/app/run_django.sh b/docker/app/run_django.sh index e6f2b48..63ffef9 100755 --- a/docker/app/run_django.sh +++ b/docker/app/run_django.sh @@ -43,7 +43,8 @@ echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE} if [ x"$LAUNCHER" != x"" ]; then echo using ${LAUNCHER} fi -gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.wsgi:application \ +gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.asgi:application \ --name ${APP}${ENV} \ + --worker-class uvicorn.workers.UvicornWorker \ --config $(dirname $(readlink -f "$0"))/gunicorn_config.py \ - $* 2>&1 + $* 2>&1 \ No newline at end of file diff --git a/hea/asgi.py b/hea/asgi.py index 38778e6..dd48af3 100644 --- a/hea/asgi.py +++ b/hea/asgi.py @@ -9,8 +9,22 @@ import os +from channels.auth import AuthMiddlewareStack +from channels.routing import ProtocolTypeRouter, URLRouter +from channels.security.websocket import AllowedHostsOriginValidator from django.core.asgi import get_asgi_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings") -application = get_asgi_application() +django_asgi_app = get_asgi_application() + +# Import routing after Django setup +from common.routing import websocket_urlpatterns # noqa: E402 + +application = ProtocolTypeRouter( + { + "http": django_asgi_app, + # WebSocket requests handled by Channels consumers + "websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))), + } +) diff --git a/hea/settings/base.py b/hea/settings/base.py index f27a8f4..8d87e96 100644 --- a/hea/settings/base.py +++ b/hea/settings/base.py @@ -109,6 +109,7 @@ "rest_framework_gis", "revproxy", "corsheaders", + "channels", ] PROJECT_APPS = ["common", "metadata", "baseline"] INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS @@ -155,6 +156,8 @@ "SEARCH_PARAM": "search", } +ASGI_APPLICATION = "hea.asgi.application" +CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}} ########## CORS CONFIGURATION # See: https://github.com/ottoyiu/django-cors-headers @@ -250,6 +253,12 @@ }, "filters": { "require_debug_false": {"()": "django.utils.log.RequireDebugFalse"}, + "suppress_ws_pings": { + "()": "common.logging_filters.SuppressWebSocketPings", + }, + "suppress_revproxy_noise": { + "()": "common.logging_filters.SuppressRevProxyNoise", + }, }, "handlers": { "logfile": { @@ -266,6 +275,7 @@ "stream": sys.stdout, "class": "logging.StreamHandler", "formatter": env.str("LOG_FORMATTER", "standard"), + "filters": ["suppress_ws_pings", "suppress_revproxy_noise"], }, "mail_admins": { "level": "ERROR", @@ -280,11 +290,43 @@ "django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False}, "factory": {"handlers": ["console", "logfile"], "level": "INFO"}, "faker": {"handlers": ["console", "logfile"], "level": "INFO"}, - "luigi": {"level": "INFO"}, - "luigi-interface": {"level": "INFO"}, "urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, "common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False}, + "uvicorn": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + }, + "uvicorn.error": { + "handlers": ["console"], + "level": "DEBUG", + "propagate": False, + "filters": ["suppress_ws_pings"], + }, + "uvicorn.access": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + }, + "revproxy": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, + "revproxy.view": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, + "revproxy.response": { + "handlers": ["console"], + "level": "INFO", + "propagate": False, + "filters": ["suppress_revproxy_noise"], + }, }, # Keep root at DEBUG and use the `level` on the handler to control logging output, # so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin` diff --git a/requirements/base.txt b/requirements/base.txt index 785d848..5d34d56 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -1,4 +1,5 @@ # Do not pip freeze into this file. Add only the packages you need so pip can align dependencies. +channels==4.3.1 dagster==1.11.7 dagster_aws==0.27.7 dagster-pipes==1.11.7 @@ -46,6 +47,7 @@ sqlparse==0.5.0 tabulate==0.9.0 # Need universal-pathlib > 0.2.0 for gdrivefs support universal-pathlib==0.2.1 +uvicorn==0.37.0 whitenoise==6.4.0 xlrd==2.0.1 xlutils==2.0.0