263 lines
11 KiB
Python
263 lines
11 KiB
Python
import logging
|
|
from typing import Optional, Dict, Any, Union
|
|
|
|
from channels.db import database_sync_to_async
|
|
from corsheaders.middleware import CorsMiddleware as BaseCorsMiddleware
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.http import JsonResponse, HttpRequest, HttpResponseBase
|
|
from django.utils.deprecation import MiddlewareMixin
|
|
|
|
from core.models import CustomerProfile, TeamProfile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ConditionalCorsMiddleware(BaseCorsMiddleware):
|
|
"""
|
|
CORS middleware that skips CORS processing when requests come through Oathkeeper.
|
|
|
|
This prevents duplicate CORS headers since Oathkeeper handles CORS for proxied requests.
|
|
Only applies CORS for direct requests (development/testing scenarios).
|
|
"""
|
|
|
|
def __call__(self, request: HttpRequest) -> HttpResponseBase:
|
|
# Check if request came through Oathkeeper
|
|
oathkeeper_secret = request.META.get('HTTP_X_OATHKEEPER_SECRET')
|
|
expected_secret = getattr(settings, 'OATHKEEPER_SECRET', None)
|
|
|
|
# If request has valid Oathkeeper secret, skip CORS processing
|
|
if oathkeeper_secret and expected_secret and oathkeeper_secret == expected_secret:
|
|
logger.debug(f"Skipping CORS processing for Oathkeeper request to {request.path}")
|
|
response = self.get_response(request)
|
|
return response
|
|
|
|
# For direct requests (development, testing), apply CORS normally
|
|
logger.debug(f"Applying CORS processing for direct request to {request.path}")
|
|
return super().__call__(request)
|
|
|
|
class OryHeaderAuthenticationMiddleware(MiddlewareMixin):
|
|
"""
|
|
Strict middleware that requires Ory authentication headers AND Django profiles.
|
|
All users MUST have a corresponding Django profile to use the application.
|
|
|
|
Security: Verifies requests came from Oathkeeper via shared secret.
|
|
"""
|
|
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
super().__init__(get_response)
|
|
|
|
def process_request(self, request: HttpRequest) -> Optional[JsonResponse]:
|
|
# Skip authentication for specific paths if needed
|
|
if self._should_skip_auth(request):
|
|
request.user = AnonymousUser()
|
|
return None
|
|
|
|
# 🔒 SECURITY: Verify request came from Oathkeeper
|
|
oathkeeper_secret = request.META.get('HTTP_X_OATHKEEPER_SECRET')
|
|
expected_secret = settings.OATHKEEPER_SECRET
|
|
|
|
if not expected_secret:
|
|
logger.critical("OATHKEEPER_SECRET not configured in settings!")
|
|
return JsonResponse(
|
|
{"detail": "Server configuration error"},
|
|
status=500
|
|
)
|
|
|
|
if oathkeeper_secret != expected_secret:
|
|
logger.warning(
|
|
f"Invalid or missing Oathkeeper secret from {request.META.get('REMOTE_ADDR')} "
|
|
f"for path {request.path}"
|
|
)
|
|
return JsonResponse(
|
|
{"detail": "Forbidden - requests must come through the API gateway"},
|
|
status=403
|
|
)
|
|
|
|
# Extract required headers (now safe to trust these)
|
|
user_id = request.META.get('HTTP_X_USER_ID')
|
|
profile_type = request.META.get('HTTP_X_USER_PROFILE_TYPE')
|
|
django_profile_id = request.META.get('HTTP_X_DJANGO_PROFILE_ID')
|
|
|
|
# Reject if no user ID
|
|
if not user_id:
|
|
return JsonResponse(
|
|
{"detail": "Authentication required - no user ID provided"},
|
|
status=401,
|
|
headers={"WWW-Authenticate": 'Bearer realm="api"'}
|
|
)
|
|
|
|
# Reject if no profile type
|
|
if not profile_type:
|
|
return JsonResponse(
|
|
{"detail": "Authentication required - no profile type provided"},
|
|
status=401,
|
|
headers={"WWW-Authenticate": 'Bearer realm="api"'}
|
|
)
|
|
|
|
# Validate profile type
|
|
if profile_type not in ['team', 'customer']:
|
|
return JsonResponse(
|
|
{"detail": f"Invalid profile type: {profile_type}"},
|
|
status=403
|
|
)
|
|
|
|
# Django profile ID is REQUIRED
|
|
if not django_profile_id:
|
|
return JsonResponse(
|
|
{"detail": "Django profile is required to access this application"},
|
|
status=403
|
|
)
|
|
|
|
# Get Django profile
|
|
profile = self._get_profile_by_id(django_profile_id, profile_type)
|
|
if not profile:
|
|
return JsonResponse(
|
|
{"detail": f"Django profile {django_profile_id} not found or type mismatch"},
|
|
status=403
|
|
)
|
|
|
|
# Store Ory user information on request for additional context
|
|
request.ory_user_id = user_id
|
|
request.ory_user_email = request.META.get('HTTP_X_USER_EMAIL')
|
|
request.ory_user_first_name = request.META.get('HTTP_X_USER_FIRST_NAME')
|
|
request.ory_user_last_name = request.META.get('HTTP_X_USER_LAST_NAME')
|
|
request.ory_user_phone = request.META.get('HTTP_X_USER_PHONE')
|
|
request.ory_profile_type = profile_type
|
|
|
|
# Set the authenticated Django user and profile
|
|
request.profile = profile
|
|
return None
|
|
|
|
@staticmethod
|
|
def _should_skip_auth(request: HttpRequest) -> bool:
|
|
"""Determine if authentication should be skipped for this request."""
|
|
skip_paths = [
|
|
'/admin/',
|
|
'/health/',
|
|
'/static/',
|
|
]
|
|
|
|
# Allow CORS preflight requests
|
|
if request.method == 'OPTIONS':
|
|
return True
|
|
|
|
return any(request.path.startswith(path) for path in skip_paths)
|
|
|
|
@staticmethod
|
|
def _get_profile_by_id(profile_id: str, expected_type: str) -> Optional[Union[TeamProfile, CustomerProfile]]:
|
|
"""
|
|
Get Django profile by ID and validate it matches the expected type.
|
|
Returns None if not found or type mismatch.
|
|
"""
|
|
try:
|
|
if expected_type == 'team':
|
|
return TeamProfile.objects.select_related('user').get(id=profile_id)
|
|
elif expected_type == 'customer':
|
|
return CustomerProfile.objects.select_related('user').get(id=profile_id)
|
|
else:
|
|
logger.warning(f"Unknown profile type: {expected_type}")
|
|
return None
|
|
except (TeamProfile.DoesNotExist, CustomerProfile.DoesNotExist):
|
|
logger.warning(f"Profile {profile_id} not found for type {expected_type}")
|
|
return None
|
|
|
|
|
|
class OryWebSocketAuthMiddleware:
|
|
"""
|
|
WebSocket middleware that authenticates using Ory headers from the initial HTTP upgrade request.
|
|
Validates that users have the required Django profile to access the application.
|
|
|
|
Security: Verifies requests came from Oathkeeper via shared secret.
|
|
"""
|
|
def __init__(self, app: Any) -> None:
|
|
self.app = app
|
|
|
|
async def __call__(self, scope: Dict[str, Any], receive: Any, send: Any) -> Any:
|
|
if scope['type'] == 'websocket':
|
|
# Extract headers from the initial HTTP upgrade request
|
|
headers: Dict[bytes, bytes] = dict(scope.get('headers', []))
|
|
|
|
# 🔒 SECURITY: Verify request came from Oathkeeper
|
|
oathkeeper_secret = headers.get(b'x-oathkeeper-secret', b'').decode('utf-8')
|
|
expected_secret = settings.OATHKEEPER_SECRET
|
|
|
|
if not expected_secret:
|
|
logger.critical("OATHKEEPER_SECRET not configured for WebSocket!")
|
|
await send({
|
|
'type': 'websocket.close',
|
|
'code': 1011, # Internal error
|
|
})
|
|
return None
|
|
|
|
if oathkeeper_secret != expected_secret:
|
|
logger.warning(f"Invalid Oathkeeper secret for WebSocket connection")
|
|
await send({
|
|
'type': 'websocket.close',
|
|
'code': 4403, # Forbidden
|
|
})
|
|
return None
|
|
|
|
# Ory headers should be passed through Oathkeeper (now safe to trust)
|
|
user_id = headers.get(b'x-user-id', b'').decode('utf-8')
|
|
profile_type = headers.get(b'x-user-profile-type', b'').decode('utf-8')
|
|
django_profile_id = headers.get(b'x-django-profile-id', b'').decode('utf-8')
|
|
|
|
if user_id and profile_type and django_profile_id:
|
|
# Validate profile type
|
|
if profile_type not in ['team', 'customer']:
|
|
logger.warning(f"Invalid profile type for WebSocket: {profile_type}")
|
|
await send({
|
|
'type': 'websocket.close',
|
|
'code': 4403, # Custom close code for forbidden
|
|
})
|
|
return None
|
|
|
|
# Fetch the profile from the database
|
|
profile: Optional[Union[TeamProfile, CustomerProfile]] = await self._get_profile(django_profile_id, profile_type)
|
|
if profile:
|
|
scope['profile'] = profile
|
|
|
|
# Store Ory user information in scope for additional context
|
|
scope['ory_user_id'] = user_id
|
|
scope['ory_user_email'] = headers.get(b'x-user-email', b'').decode('utf-8')
|
|
scope['ory_user_first_name'] = headers.get(b'x-user-first-name', b'').decode('utf-8')
|
|
scope['ory_user_last_name'] = headers.get(b'x-user-last-name', b'').decode('utf-8')
|
|
scope['ory_user_phone'] = headers.get(b'x-user-phone', b'').decode('utf-8')
|
|
scope['ory_profile_type'] = profile_type
|
|
else:
|
|
# Reject connection if profile not found
|
|
logger.warning(f"Profile {django_profile_id} not found for WebSocket connection")
|
|
await send({
|
|
'type': 'websocket.close',
|
|
'code': 4403, # Custom close code for forbidden
|
|
})
|
|
return None
|
|
else:
|
|
# Reject connection if headers missing
|
|
logger.warning(f"Missing Ory headers for WebSocket connection")
|
|
await send({
|
|
'type': 'websocket.close',
|
|
'code': 4401, # Custom close code for unauthorized
|
|
})
|
|
return None
|
|
|
|
return await self.app(scope, receive, send)
|
|
|
|
@database_sync_to_async
|
|
def _get_profile(self, profile_id: str, profile_type: str) -> Optional[Union[TeamProfile, CustomerProfile]]:
|
|
"""
|
|
Get Django profile by ID and validate it matches the expected type.
|
|
Returns None if not found or type mismatch.
|
|
"""
|
|
try:
|
|
if profile_type == 'team':
|
|
return TeamProfile.objects.select_related('user').get(id=profile_id)
|
|
elif profile_type == 'customer':
|
|
return CustomerProfile.objects.select_related('user').get(id=profile_id)
|
|
except (TeamProfile.DoesNotExist, CustomerProfile.DoesNotExist):
|
|
logger.warning(f"Profile {profile_id} not found for type {profile_type}")
|
|
return None
|
|
return None
|