Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions bcgov_arches_common/util/auth/oauth_token_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.conf import settings
from bcgov_arches_common.util.auth.token_store import save_token
from bcgov_arches_common.util.auth.oauth_session_control import log_user_out
from oauth2_provider.oauth2_backends import get_oauthlib_core

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -38,10 +39,37 @@ def __init__(self, get_response):
self.get_response = get_response
self.oauth_config = OAUTH_CONFIG

@staticmethod
def dot_access_token_is_valid(request) -> bool:
"""
Returns True if request includes a valid django-oauth-toolkit Bearer token.
Does not raise; safe to call from middleware.
"""
auth = request.headers.get("Authorization", "")
if not auth.lower().startswith("bearer "):
return False

# oauthlib core will validate signature/expiry/scope rules etc.
core = get_oauthlib_core()
valid, oauth2_req = core.verify_request(request, scopes=[])

if valid:
# Optional: make token/user info available downstream
request.oauth2_validated = True
request.access_token = getattr(oauth2_req, "access_token", None)
request.resource_owner = getattr(oauth2_req, "user", None)
print("User has a valid DOT access token")
return True

return False

def __call__(self, request):
if bypass_auth(request):
return self.get_response(request)

# If request has a valid DOT bearer token, do NOT redirect to HOME_PAGE
has_valid_dot_token = self.dot_access_token_is_valid(request)

if logger.isEnabledFor(logging.DEBUG):
expiry_timestamp = request.session.get_expiry_date().timestamp()
now = time.time()
Expand Down Expand Up @@ -84,11 +112,15 @@ def __call__(self, request):
log_user_out(request)
return redirect(UNAUTHORIZED_PAGE)
else:
if request.user.is_authenticated:
if request.user.is_authenticated and not has_valid_dot_token:
logger.warning(f"[Token] No token - logging user out.")
log_user_out(request)

if AUTH_REQUIRED and not request.user.is_authenticated:
if (
AUTH_REQUIRED
and not request.user.is_authenticated
and not has_valid_dot_token
):
return redirect(HOME_PAGE)

return self.get_response(request)
71 changes: 45 additions & 26 deletions bcgov_arches_common/views/map.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,60 @@
import urllib3
from arches.app.views.map import TileserverProxyView
from django.conf import settings
from bcgov_arches_common.views.outbound_proxy_mixin import OutboundProxyMixin


class BCTileserverLocalProxyView(TileserverProxyView):
upstream_urls = settings.BC_TILESERVER_URLS
class BCTileserverProxyView(TileserverProxyView, OutboundProxyMixin):
"""
Subclass of the TileserverProxyView that has multiple upstream servers.
- the BC_TILESERVER_URLS is a dict with source->upstream URL mapping
- the request URL must have the source parameter set. If not set it defaults to the parent upstream value
- the BC_TILESERVER_URLS is a dict with source->upstream URL mapping and outbound proxy configuration
- the request URL must have the source parameter set. If not set it defaults to "openmaps"
"""

def __init__(self, *args, **kwargs):
super(BCTileserverLocalProxyView, self).__init__(*args, **kwargs)
DEFAULT_SOURCE = "openmaps"
DEFAULT_CONFIG = {
"url": getattr(settings, "TILESERVER_URL", "https://openmaps.gov.bc.ca/"),
"use_outbound_proxy": False,
}

@property
def upstream_urls(self):
"""
Get upstream URLs from settings, ensuring the default source is always available.
"""
# Get configuration from settings or empty dict if not defined
urls = getattr(settings, "BC_TILESERVER_URLS", {})

# Ensure default source exists
if self.DEFAULT_SOURCE not in urls:
urls[self.DEFAULT_SOURCE] = self.DEFAULT_CONFIG

return urls

def get_request_headers(self):
proxy_source = self.request.GET.get("source", "")
# print("\tProxy source: %s" % proxy_source)
if proxy_source in self.upstream_urls:
self.upstream = self.upstream_urls[proxy_source]

return super(BCTileserverLocalProxyView, self).get_request_headers()
# Get all available sources
upstream_urls = self.upstream_urls

# Get source configuration with fallback to default source
source_config = (
upstream_urls[proxy_source]
if proxy_source in upstream_urls
else upstream_urls[self.DEFAULT_SOURCE]
)

class BCTileserverProxyView(BCTileserverLocalProxyView):
"""
Subclass of the TileserverProxyView that has multiple upstream servers.
- the BC_TILESERVER_URLS is a dict with source->upstream URL mapping
- the request URL must have the source parameter set. If not set it defaults to the parent upstream value
"""
# Update upstream URL from configuration
self.upstream = source_config.get("url")

# Configure the HTTP connection based on proxy setting
self.http = self.get_http_connection(
use_outbound_proxy=source_config.get("use_outbound_proxy", False)
)

return super(BCTileserverProxyView, self).get_request_headers()

def __init__(self, *args, **kwargs):
super(BCTileserverProxyView, self).__init__(*args, **kwargs)
# Setup outbound proxy if it is configured
if (
hasattr(settings, "TILESERVER_OUTBOUND_PROXY")
and settings.TILESERVER_OUTBOUND_PROXY
):
# print("Setting outbound proxy to %s" % settings.TILESERVER_OUTBOUND_PROXY)
self.http = urllib3.ProxyManager(settings.TILESERVER_OUTBOUND_PROXY)
def dispatch(self, request, *args, **kwargs):
# When hitting /bctileserver/ there is no <path:path> kwarg.
# Normalize to empty string so upstream becomes "/" not "None".
kwargs["path"] = kwargs.get("path", "")
return super().dispatch(request, *args, **kwargs)
26 changes: 26 additions & 0 deletions bcgov_arches_common/views/outbound_proxy_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# %%
import urllib3
from django.conf import settings


class OutboundProxyMixin:
"""
Mixin that adds outbound proxy functionality to a view.
This can be used by proxy views to route requests through an external proxy.
"""

def get_http_connection(self, use_outbound_proxy=False):
"""
Returns the appropriate HTTP connection based on whether
to use the outbound proxy or not.
"""
if (
use_outbound_proxy
and hasattr(settings, "TILESERVER_OUTBOUND_PROXY")
and settings.TILESERVER_OUTBOUND_PROXY
):
# Return proxy connection
return urllib3.ProxyManager(settings.TILESERVER_OUTBOUND_PROXY)
else:
# Return regular connection
return urllib3.PoolManager()
74 changes: 74 additions & 0 deletions tests/views/outbound_proxy_mixin_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from django.test import TestCase
from unittest.mock import patch, MagicMock
from django.conf import settings
import urllib3
from bcgov_arches_common.views.outbound_proxy_mixin import OutboundProxyMixin


class OutboundProxyMixinTests(TestCase):
def setUp(self):
"""
Set up the test environment. This method will run before every test.
"""
self.mixin = OutboundProxyMixin()

@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.ProxyManager")
@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.PoolManager")
def test_get_http_connection_with_outbound_proxy(
self, mock_pool_manager, mock_proxy_manager
):
"""
Test that get_http_connection uses ProxyManager when an outbound proxy is specified.
"""
settings.TILESERVER_OUTBOUND_PROXY = "http://example-proxy.com"

# Call the method
connection = self.mixin.get_http_connection(use_outbound_proxy=True)

# Assert that ProxyManager was called with the correct argument
mock_proxy_manager.assert_called_once_with(settings.TILESERVER_OUTBOUND_PROXY)
mock_pool_manager.assert_not_called()

# Assert that the returned connection is a ProxyManager instance
self.assertEqual(connection, mock_proxy_manager.return_value)

@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.PoolManager")
@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.ProxyManager")
def test_get_http_connection_without_outbound_proxy(
self, mock_proxy_manager, mock_pool_manager
):
"""
Test that get_http_connection uses PoolManager when no outbound proxy is specified.
"""
# Ensure TILESERVER_OUTBOUND_PROXY is not set
settings.TILESERVER_OUTBOUND_PROXY = None

# Call the method
connection = self.mixin.get_http_connection(use_outbound_proxy=False)

# Assert that PoolManager was called and ProxyManager was not
mock_pool_manager.assert_called_once()
mock_proxy_manager.assert_not_called()

# Assert that the returned connection is a PoolManager instance
self.assertEqual(connection, mock_pool_manager.return_value)

@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.PoolManager")
@patch("bcgov_arches_common.views.outbound_proxy_mixin.urllib3.ProxyManager")
def test_get_http_connection_with_proxy_config_disabled(
self, mock_proxy_manager, mock_pool_manager
):
"""
Test that get_http_connection ignores outbound proxy settings if use_outbound_proxy is False.
"""
settings.TILESERVER_OUTBOUND_PROXY = "http://example-proxy.com"

# Call the method
connection = self.mixin.get_http_connection(use_outbound_proxy=False)

# Ensure PoolManager was called instead of ProxyManager
mock_pool_manager.assert_called_once()
mock_proxy_manager.assert_not_called()

# Assert the connection is a PoolManager instance
self.assertEqual(connection, mock_pool_manager.return_value)