Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/common/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,22 @@ class CommonConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "common"
verbose_name = _("common")

def ready(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very non-standard.

My reading around this is that it's not an error, even if we would get better performance if we increased the pool max_size.

Do we know if the message is caused by the regular proxy or by the websockets one?

I am tempted to suppress the log message, but otherwise leave it all alone for now. Perhaps we start a new MR based on the original one, but without these commits.

"""Patch urllib3 connection pool size on startup"""
from urllib3 import HTTPConnectionPool

original_init = HTTPConnectionPool.__init__

def patched_init(self, *args, **kwargs):
# Force larger pool size
kwargs["maxsize"] = kwargs.get("maxsize", 50)
if kwargs["maxsize"] < 50:
kwargs["maxsize"] = 50

original_init(self, *args, **kwargs)

# Apply patch
HTTPConnectionPool.__init__ = patched_init

print("urllib3 connection pool patched: maxsize=50")
104 changes: 104 additions & 0 deletions apps/common/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import asyncio
import logging
import os

import websockets
from channels.exceptions import DenyConnection
from channels.generic.websocket import AsyncWebsocketConsumer
from django.contrib.auth.models import AnonymousUser

logger = logging.getLogger(__name__)


class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer):

async def connect(self):
logger.info(f"WebSocket connection attempt: {self.scope['path']}")

# Authentication check
if isinstance(self.scope["user"], AnonymousUser):
logger.error("Authentication required")
raise DenyConnection("Authentication required")

if not self.scope["user"].has_perm("common.access_dagster_ui"):
logger.error(f"User {self.scope['user'].username} lacks permission")
raise DenyConnection("Permission denied")

logger.info(f"User {self.scope['user'].username} authenticated")

# Build upstream URL
dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000")
dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "")

path = self.scope["path"]
if path.startswith("/pipelines/"):
path = path[len("/pipelines/") :]

# Convert http to ws
if dagster_url.startswith("https://"):
ws_url = dagster_url.replace("https://", "wss://", 1)
else:
ws_url = dagster_url.replace("http://", "ws://", 1)

# Build target URL
if dagster_prefix:
target_url = f"{ws_url}/{dagster_prefix}/{path}"
else:
target_url = f"{ws_url}/{path}"

# Add query string
if self.scope.get("query_string"):
target_url += f"?{self.scope['query_string'].decode()}"

logger.info(f"Connecting to upstream: {target_url}")

# Get subprotocols from client
subprotocols = self.scope.get("subprotocols", [])

try:
self.websocket = await websockets.connect(
target_url,
max_size=2097152,
ping_interval=20,
subprotocols=subprotocols if subprotocols else None,
)
logger.info("Connected to upstream")
except Exception as e:
logger.error(f"Failed to connect: {e}")
raise DenyConnection(f"Connection to upstream failed: {e}")

await self.accept(self.websocket.subprotocol)
logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}")

self.consumer_task = asyncio.create_task(self.consume_from_upstream())

async def disconnect(self, close_code):
logger.info(f"Disconnecting with code {close_code}")
if hasattr(self, "consumer_task"):
self.consumer_task.cancel()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
if hasattr(self, "websocket"):
await self.websocket.close()

async def receive(self, text_data=None, bytes_data=None):
try:
await self.websocket.send(bytes_data or text_data)
except Exception as e:
logger.error(f"Error forwarding to upstream: {e}")
await self.close()

async def consume_from_upstream(self):
try:
async for message in self.websocket:
if isinstance(message, bytes):
await self.send(bytes_data=message)
else:
await self.send(text_data=message)
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"Error consuming from upstream: {e}")
await self.close()
8 changes: 8 additions & 0 deletions apps/common/routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.urls import re_path

from common.consumers import DagsterWebSocketProxyConsumer

websocket_urlpatterns = [
# Route WebSocket connections for Dagster proxy
re_path(r"^pipelines/(?P<path>.*)$", DagsterWebSocketProxyConsumer.as_asgi()),
]
5 changes: 3 additions & 2 deletions docker/app/run_django.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE}
if [ x"$LAUNCHER" != x"" ]; then
echo using ${LAUNCHER}
fi
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.wsgi:application \
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.asgi:application \
--name ${APP}${ENV} \
--worker-class uvicorn.workers.UvicornWorker \
--config $(dirname $(readlink -f "$0"))/gunicorn_config.py \
$* 2>&1
$* 2>&1
16 changes: 15 additions & 1 deletion hea/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,22 @@

import os

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
from django.core.asgi import get_asgi_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings")

application = get_asgi_application()
django_asgi_app = get_asgi_application()

# Import routing after Django setup
from common.routing import websocket_urlpatterns # noqa: E402

application = ProtocolTypeRouter(
{
"http": django_asgi_app,
# WebSocket requests handled by Channels consumers
"websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))),
}
)
7 changes: 5 additions & 2 deletions hea/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"rest_framework_gis",
"revproxy",
"corsheaders",
"channels",
]
PROJECT_APPS = ["common", "metadata", "baseline"]
INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS
Expand Down Expand Up @@ -155,6 +156,8 @@
"SEARCH_PARAM": "search",
}

ASGI_APPLICATION = "hea.asgi.application"
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}

########## CORS CONFIGURATION
# See: https://github.com/ottoyiu/django-cors-headers
Expand Down Expand Up @@ -280,11 +283,11 @@
"django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False},
"factory": {"handlers": ["console", "logfile"], "level": "INFO"},
"faker": {"handlers": ["console", "logfile"], "level": "INFO"},
"luigi": {"level": "INFO"},
"luigi-interface": {"level": "INFO"},
"urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
"uvicorn.error": {"handlers": ["console"], "level": "WARNING", "propagate": False},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that we want to lose all INFO messages. Just the overly chatty ones.

Would this be better using a custom logging filter to suppress specific messages?

We need to be certain that the logger is still capturing regular new page requests.

"uvicorn.access": {"handlers": ["console"], "level": "WARNING", "propagate": False},
},
# Keep root at DEBUG and use the `level` on the handler to control logging output,
# so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin`
Expand Down
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Do not pip freeze into this file. Add only the packages you need so pip can align dependencies.
channels==4.3.1
dagster==1.11.7
dagster_aws==0.27.7
dagster-pipes==1.11.7
Expand Down Expand Up @@ -46,6 +47,7 @@ sqlparse==0.5.0
tabulate==0.9.0
# Need universal-pathlib > 0.2.0 for gdrivefs support
universal-pathlib==0.2.1
uvicorn==0.37.0
whitenoise==6.4.0
xlrd==2.0.1
xlutils==2.0.0
Expand Down