Skip to content

Commit d490fbd

Browse files
authored
Merge pull request #176 from American-Institutes-for-Research/HEA-752/Dagster-GraphQL-API-is-intermittently-failing-with-a-ProtocolError-when-accessed-via-the-revproxy-Django-view
Hea 752/dagster graph ql api is intermittently failing with a protocol error when accessed via the revproxy django view
2 parents 15bf8bb + 300762d commit d490fbd

File tree

7 files changed

+221
-5
lines changed

7 files changed

+221
-5
lines changed

apps/common/consumers.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import asyncio
2+
import logging
3+
import os
4+
5+
import websockets
6+
from channels.exceptions import DenyConnection
7+
from channels.generic.websocket import AsyncWebsocketConsumer
8+
from django.contrib.auth.models import AnonymousUser
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer):
14+
15+
async def connect(self):
16+
logger.info(f"WebSocket connection attempt: {self.scope['path']}")
17+
18+
# Authentication check
19+
if isinstance(self.scope["user"], AnonymousUser):
20+
logger.error("Authentication required")
21+
raise DenyConnection("Authentication required")
22+
23+
perm = "common.access_dagster_ui"
24+
if not self.scope["user"].has_perm(perm):
25+
logger.error(
26+
f"User {self.scope['user'].username} lacks permission {perm} for accessing {self.scope['path']}"
27+
)
28+
raise DenyConnection("Permission denied")
29+
30+
logger.info(f"User {self.scope['user'].username} authenticated")
31+
32+
# Build upstream URL
33+
dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000")
34+
dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "pipelines")
35+
36+
path = self.scope["path"]
37+
if path.startswith(f"/{dagster_prefix}/"):
38+
path = path[len(f"/{dagster_prefix}/") :]
39+
40+
# Convert http to ws
41+
if dagster_url.startswith("https://"):
42+
ws_url = dagster_url.replace("https://", "wss://", 1)
43+
else:
44+
ws_url = dagster_url.replace("http://", "ws://", 1)
45+
46+
# Build target URL
47+
if dagster_prefix:
48+
target_url = f"{ws_url}/{dagster_prefix}/{path}"
49+
else:
50+
target_url = f"{ws_url}/{path}"
51+
# Add query string
52+
if self.scope.get("query_string"):
53+
target_url += f"?{self.scope['query_string'].decode()}"
54+
55+
logger.info(f"Connecting to upstream: {target_url}")
56+
57+
# Get subprotocols from client
58+
subprotocols = self.scope.get("subprotocols", [])
59+
60+
try:
61+
self.websocket = await websockets.connect(
62+
target_url,
63+
max_size=10485760,
64+
ping_interval=20,
65+
subprotocols=subprotocols if subprotocols else None,
66+
)
67+
logger.info("Connected to upstream")
68+
except Exception as e:
69+
logger.error(f"Failed to connect: {e}")
70+
raise DenyConnection(f"Connection to upstream failed: {e}")
71+
72+
await self.accept(self.websocket.subprotocol)
73+
logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}")
74+
75+
self.consumer_task = asyncio.create_task(self.consume_from_upstream())
76+
77+
async def disconnect(self, close_code):
78+
logger.info(f"Disconnecting with code {close_code}")
79+
if hasattr(self, "consumer_task"):
80+
self.consumer_task.cancel()
81+
try:
82+
await self.consumer_task
83+
except asyncio.CancelledError:
84+
pass
85+
if hasattr(self, "websocket"):
86+
await self.websocket.close()
87+
88+
async def receive(self, text_data=None, bytes_data=None):
89+
try:
90+
await self.websocket.send(bytes_data or text_data)
91+
except Exception as e:
92+
logger.error(f"Error forwarding to upstream: {e}")
93+
await self.close()
94+
95+
async def consume_from_upstream(self):
96+
try:
97+
async for message in self.websocket:
98+
if isinstance(message, bytes):
99+
await self.send(bytes_data=message)
100+
else:
101+
await self.send(text_data=message)
102+
except asyncio.CancelledError:
103+
pass
104+
except Exception as e:
105+
logger.error(f"Error consuming from upstream: {e}")
106+
await self.close()

apps/common/logging_filters.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
3+
4+
class SuppressWebSocketPings(logging.Filter):
5+
6+
def filter(self, record):
7+
suppress_phrases = [
8+
"sending keepalive ping",
9+
"received keepalive pong",
10+
"> PING",
11+
"< PONG",
12+
"% sending keepalive",
13+
"% received keepalive",
14+
"ASGI 'lifespan' protocol appears unsupported.",
15+
]
16+
17+
message = record.getMessage()
18+
19+
for phrase in suppress_phrases:
20+
if phrase in message:
21+
return False # Don't log this message
22+
23+
return True
24+
25+
26+
class SuppressRevProxyNoise(logging.Filter):
27+
28+
def filter(self, record):
29+
# Suppress these RevProxy messages
30+
suppress_phrases = [
31+
"ProxyView created",
32+
"Normalizing response headers",
33+
"Checking for invalid cookies",
34+
"Starting streaming HTTP Response",
35+
]
36+
37+
message = record.getMessage()
38+
39+
for phrase in suppress_phrases:
40+
if phrase in message:
41+
return False
42+
43+
return True

apps/common/routing.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.urls import re_path
2+
3+
from common.consumers import DagsterWebSocketProxyConsumer
4+
5+
websocket_urlpatterns = [
6+
# Route WebSocket connections for Dagster proxy
7+
re_path(r"^pipelines/(?P<path>.*)$", DagsterWebSocketProxyConsumer.as_asgi()),
8+
]

docker/app/run_django.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE}
4343
if [ x"$LAUNCHER" != x"" ]; then
4444
echo using ${LAUNCHER}
4545
fi
46-
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.wsgi:application \
46+
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.asgi:application \
4747
--name ${APP}${ENV} \
48+
--worker-class uvicorn.workers.UvicornWorker \
4849
--config $(dirname $(readlink -f "$0"))/gunicorn_config.py \
49-
$* 2>&1
50+
$* 2>&1

hea/asgi.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,22 @@
99

1010
import os
1111

12+
from channels.auth import AuthMiddlewareStack
13+
from channels.routing import ProtocolTypeRouter, URLRouter
14+
from channels.security.websocket import AllowedHostsOriginValidator
1215
from django.core.asgi import get_asgi_application
1316

1417
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings")
1518

16-
application = get_asgi_application()
19+
django_asgi_app = get_asgi_application()
20+
21+
# Import routing after Django setup
22+
from common.routing import websocket_urlpatterns # noqa: E402
23+
24+
application = ProtocolTypeRouter(
25+
{
26+
"http": django_asgi_app,
27+
# WebSocket requests handled by Channels consumers
28+
"websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))),
29+
}
30+
)

hea/settings/base.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
"rest_framework_gis",
110110
"revproxy",
111111
"corsheaders",
112+
"channels",
112113
]
113114
PROJECT_APPS = ["common", "metadata", "baseline"]
114115
INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS
@@ -155,6 +156,8 @@
155156
"SEARCH_PARAM": "search",
156157
}
157158

159+
ASGI_APPLICATION = "hea.asgi.application"
160+
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
158161

159162
########## CORS CONFIGURATION
160163
# See: https://github.com/ottoyiu/django-cors-headers
@@ -250,6 +253,12 @@
250253
},
251254
"filters": {
252255
"require_debug_false": {"()": "django.utils.log.RequireDebugFalse"},
256+
"suppress_ws_pings": {
257+
"()": "common.logging_filters.SuppressWebSocketPings",
258+
},
259+
"suppress_revproxy_noise": {
260+
"()": "common.logging_filters.SuppressRevProxyNoise",
261+
},
253262
},
254263
"handlers": {
255264
"logfile": {
@@ -266,6 +275,7 @@
266275
"stream": sys.stdout,
267276
"class": "logging.StreamHandler",
268277
"formatter": env.str("LOG_FORMATTER", "standard"),
278+
"filters": ["suppress_ws_pings", "suppress_revproxy_noise"],
269279
},
270280
"mail_admins": {
271281
"level": "ERROR",
@@ -280,11 +290,43 @@
280290
"django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False},
281291
"factory": {"handlers": ["console", "logfile"], "level": "INFO"},
282292
"faker": {"handlers": ["console", "logfile"], "level": "INFO"},
283-
"luigi": {"level": "INFO"},
284-
"luigi-interface": {"level": "INFO"},
285293
"urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
286294
"common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
287295
"common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
296+
"uvicorn": {
297+
"handlers": ["console"],
298+
"level": "INFO",
299+
"propagate": False,
300+
},
301+
"uvicorn.error": {
302+
"handlers": ["console"],
303+
"level": "DEBUG",
304+
"propagate": False,
305+
"filters": ["suppress_ws_pings"],
306+
},
307+
"uvicorn.access": {
308+
"handlers": ["console"],
309+
"level": "INFO",
310+
"propagate": False,
311+
},
312+
"revproxy": {
313+
"handlers": ["console"],
314+
"level": "INFO",
315+
"propagate": False,
316+
"filters": ["suppress_revproxy_noise"],
317+
},
318+
"revproxy.view": {
319+
"handlers": ["console"],
320+
"level": "INFO",
321+
"propagate": False,
322+
"filters": ["suppress_revproxy_noise"],
323+
},
324+
"revproxy.response": {
325+
"handlers": ["console"],
326+
"level": "INFO",
327+
"propagate": False,
328+
"filters": ["suppress_revproxy_noise"],
329+
},
288330
},
289331
# Keep root at DEBUG and use the `level` on the handler to control logging output,
290332
# so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin`

requirements/base.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# Do not pip freeze into this file. Add only the packages you need so pip can align dependencies.
2+
channels==4.3.1
23
dagster==1.11.7
34
dagster_aws==0.27.7
45
dagster-pipes==1.11.7
@@ -46,6 +47,7 @@ sqlparse==0.5.0
4647
tabulate==0.9.0
4748
# Need universal-pathlib > 0.2.0 for gdrivefs support
4849
universal-pathlib==0.2.1
50+
uvicorn==0.37.0
4951
whitenoise==6.4.0
5052
xlrd==2.0.1
5153
xlutils==2.0.0

0 commit comments

Comments
 (0)