Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions apps/common/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import asyncio
import logging
import os

import websockets
from channels.exceptions import DenyConnection
from channels.generic.websocket import AsyncWebsocketConsumer
from django.contrib.auth.models import AnonymousUser

logger = logging.getLogger(__name__)


class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer):

async def connect(self):
logger.info(f"WebSocket connection attempt: {self.scope['path']}")

# Authentication check
if isinstance(self.scope["user"], AnonymousUser):
logger.error("Authentication required")
raise DenyConnection("Authentication required")

if not self.scope["user"].has_perm("common.access_dagster_ui"):
logger.error(f"User {self.scope['user'].username} lacks permission")
raise DenyConnection("Permission denied")

logger.info(f"User {self.scope['user'].username} authenticated")

# Build upstream URL
dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000")
dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "")

path = self.scope["path"]
if path.startswith("/pipelines/"):
path = path[len("/pipelines/") :]

# Convert http to ws
if dagster_url.startswith("https://"):
ws_url = dagster_url.replace("https://", "wss://", 1)
else:
ws_url = dagster_url.replace("http://", "ws://", 1)

# Build target URL
if dagster_prefix:
target_url = f"{ws_url}/{dagster_prefix}/{path}"
else:
target_url = f"{ws_url}/{path}"

# Add query string
if self.scope.get("query_string"):
target_url += f"?{self.scope['query_string'].decode()}"

logger.info(f"Connecting to upstream: {target_url}")

# Get subprotocols from client
subprotocols = self.scope.get("subprotocols", [])

try:
self.websocket = await websockets.connect(
target_url,
max_size=10485760,
ping_interval=20,
subprotocols=subprotocols if subprotocols else None,
)
logger.info("Connected to upstream")
except Exception as e:
logger.error(f"Failed to connect: {e}")
raise DenyConnection(f"Connection to upstream failed: {e}")

await self.accept(self.websocket.subprotocol)
logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}")

self.consumer_task = asyncio.create_task(self.consume_from_upstream())

async def disconnect(self, close_code):
logger.info(f"Disconnecting with code {close_code}")
if hasattr(self, "consumer_task"):
self.consumer_task.cancel()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
if hasattr(self, "websocket"):
await self.websocket.close()

async def receive(self, text_data=None, bytes_data=None):
try:
await self.websocket.send(bytes_data or text_data)
except Exception as e:
logger.error(f"Error forwarding to upstream: {e}")
await self.close()

async def consume_from_upstream(self):
try:
async for message in self.websocket:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(text_data=message)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error consuming from upstream: {e}")
await self.close()
43 changes: 43 additions & 0 deletions apps/common/logging_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging


class SuppressWebSocketPings(logging.Filter):

def filter(self, record):
suppress_phrases = [
"sending keepalive ping",
"received keepalive pong",
"> PING",
"< PONG",
"% sending keepalive",
"% received keepalive",
"ASGI 'lifespan' protocol appears unsupported.",
]

message = record.getMessage()

for phrase in suppress_phrases:
if phrase in message:
return False # Don't log this message

return True


class SuppressRevProxyNoise(logging.Filter):

def filter(self, record):
# Suppress these RevProxy messages
suppress_phrases = [
"ProxyView created",
"Normalizing response headers",
"Checking for invalid cookies",
"Starting streaming HTTP Response",
]

message = record.getMessage()

for phrase in suppress_phrases:
if phrase in message:
return False

return True
8 changes: 8 additions & 0 deletions apps/common/routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.urls import re_path

from common.consumers import DagsterWebSocketProxyConsumer

websocket_urlpatterns = [
# Route WebSocket connections for Dagster proxy
re_path(r"^pipelines/(?P<path>.*)$", DagsterWebSocketProxyConsumer.as_asgi()),
]
5 changes: 3 additions & 2 deletions docker/app/run_django.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE}
if [ x"$LAUNCHER" != x"" ]; then
echo using ${LAUNCHER}
fi
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.wsgi:application \
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.asgi:application \
--name ${APP}${ENV} \
--worker-class uvicorn.workers.UvicornWorker \
--config $(dirname $(readlink -f "$0"))/gunicorn_config.py \
$* 2>&1
$* 2>&1
16 changes: 15 additions & 1 deletion hea/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,22 @@

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings")

application = get_asgi_application()
django_asgi_app = get_asgi_application()

# Import routing after Django setup
from common.routing import websocket_urlpatterns # noqa: E402

application = ProtocolTypeRouter(
{
"http": django_asgi_app,
# WebSocket requests handled by Channels consumers
"websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))),
}
)
46 changes: 44 additions & 2 deletions hea/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"rest_framework_gis",
"revproxy",
"corsheaders",
"channels",
]
PROJECT_APPS = ["common", "metadata", "baseline"]
INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS
Expand Down Expand Up @@ -155,6 +156,8 @@
"SEARCH_PARAM": "search",
}

ASGI_APPLICATION = "hea.asgi.application"
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}

########## CORS CONFIGURATION
# See: https://github.com/ottoyiu/django-cors-headers
Expand Down Expand Up @@ -250,6 +253,12 @@
},
"filters": {
"require_debug_false": {"()": "django.utils.log.RequireDebugFalse"},
"suppress_ws_pings": {
"()": "common.logging_filters.SuppressWebSocketPings",
},
"suppress_revproxy_noise": {
"()": "common.logging_filters.SuppressRevProxyNoise",
},
},
"handlers": {
"logfile": {
Expand All @@ -266,6 +275,7 @@
"stream": sys.stdout,
"class": "logging.StreamHandler",
"formatter": env.str("LOG_FORMATTER", "standard"),
"filters": ["suppress_ws_pings", "suppress_revproxy_noise"],
},
"mail_admins": {
"level": "ERROR",
Expand All @@ -280,11 +290,43 @@
"django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False},
"factory": {"handlers": ["console", "logfile"], "level": "INFO"},
"faker": {"handlers": ["console", "logfile"], "level": "INFO"},
"luigi": {"level": "INFO"},
"luigi-interface": {"level": "INFO"},
"urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"uvicorn": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
},
"uvicorn.error": {
"handlers": ["console"],
"level": "DEBUG",
"propagate": False,
"filters": ["suppress_ws_pings"],
},
"uvicorn.access": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
},
"revproxy": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
"filters": ["suppress_revproxy_noise"],
},
"revproxy.view": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
"filters": ["suppress_revproxy_noise"],
},
"revproxy.response": {
"handlers": ["console"],
"level": "INFO",
"propagate": False,
"filters": ["suppress_revproxy_noise"],
},
},
# Keep root at DEBUG and use the `level` on the handler to control logging output,
# so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin`
Expand Down
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Do not pip freeze into this file. Add only the packages you need so pip can align dependencies.
channels==4.3.1
dagster==1.11.7
dagster_aws==0.27.7
dagster-pipes==1.11.7
Expand Down Expand Up @@ -46,6 +47,7 @@ sqlparse==0.5.0
tabulate==0.9.0
# Need universal-pathlib > 0.2.0 for gdrivefs support
universal-pathlib==0.2.1
uvicorn==0.37.0
whitenoise==6.4.0
xlrd==2.0.1
xlutils==2.0.0
Expand Down