import logging from typing import Optional, Dict, Any, Union from channels.db import database_sync_to_async from corsheaders.middleware import CorsMiddleware as BaseCorsMiddleware from django.conf import settings from django.contrib.auth.models import AnonymousUser from django.http import JsonResponse, HttpRequest, HttpResponseBase from django.utils.deprecation import MiddlewareMixin from core.models import CustomerProfile, TeamProfile logger = logging.getLogger(__name__) class ConditionalCorsMiddleware(BaseCorsMiddleware): """ CORS middleware that skips CORS processing when requests come through Oathkeeper. This prevents duplicate CORS headers since Oathkeeper handles CORS for proxied requests. Only applies CORS for direct requests (development/testing scenarios). """ def __call__(self, request: HttpRequest) -> HttpResponseBase: # Check if request came through Oathkeeper oathkeeper_secret = request.META.get('HTTP_X_OATHKEEPER_SECRET') expected_secret = getattr(settings, 'OATHKEEPER_SECRET', None) # If request has valid Oathkeeper secret, skip CORS processing if oathkeeper_secret and expected_secret and oathkeeper_secret == expected_secret: logger.debug(f"Skipping CORS processing for Oathkeeper request to {request.path}") response = self.get_response(request) return response # For direct requests (development, testing), apply CORS normally logger.debug(f"Applying CORS processing for direct request to {request.path}") return super().__call__(request) class OryHeaderAuthenticationMiddleware(MiddlewareMixin): """ Strict middleware that requires Ory authentication headers AND Django profiles. All users MUST have a corresponding Django profile to use the application. Security: Verifies requests came from Oathkeeper via shared secret. """ def __init__(self, get_response): self.get_response = get_response super().__init__(get_response) def process_request(self, request: HttpRequest) -> Optional[JsonResponse]: # Skip authentication for specific paths if needed if self._should_skip_auth(request): request.user = AnonymousUser() return None # 🔒 SECURITY: Verify request came from Oathkeeper oathkeeper_secret = request.META.get('HTTP_X_OATHKEEPER_SECRET') expected_secret = settings.OATHKEEPER_SECRET if not expected_secret: logger.critical("OATHKEEPER_SECRET not configured in settings!") return JsonResponse( {"detail": "Server configuration error"}, status=500 ) if oathkeeper_secret != expected_secret: logger.warning( f"Invalid or missing Oathkeeper secret from {request.META.get('REMOTE_ADDR')} " f"for path {request.path}" ) return JsonResponse( {"detail": "Forbidden - requests must come through the API gateway"}, status=403 ) # Extract required headers (now safe to trust these) user_id = request.META.get('HTTP_X_USER_ID') profile_type = request.META.get('HTTP_X_USER_PROFILE_TYPE') django_profile_id = request.META.get('HTTP_X_DJANGO_PROFILE_ID') # Reject if no user ID if not user_id: return JsonResponse( {"detail": "Authentication required - no user ID provided"}, status=401, headers={"WWW-Authenticate": 'Bearer realm="api"'} ) # Reject if no profile type if not profile_type: return JsonResponse( {"detail": "Authentication required - no profile type provided"}, status=401, headers={"WWW-Authenticate": 'Bearer realm="api"'} ) # Validate profile type if profile_type not in ['team', 'customer']: return JsonResponse( {"detail": f"Invalid profile type: {profile_type}"}, status=403 ) # Django profile ID is REQUIRED if not django_profile_id: return JsonResponse( {"detail": "Django profile is required to access this application"}, status=403 ) # Get Django profile profile = self._get_profile_by_id(django_profile_id, profile_type) if not profile: return JsonResponse( {"detail": f"Django profile {django_profile_id} not found or type mismatch"}, status=403 ) # Store Ory user information on request for additional context request.ory_user_id = user_id request.ory_user_email = request.META.get('HTTP_X_USER_EMAIL') request.ory_user_first_name = request.META.get('HTTP_X_USER_FIRST_NAME') request.ory_user_last_name = request.META.get('HTTP_X_USER_LAST_NAME') request.ory_user_phone = request.META.get('HTTP_X_USER_PHONE') request.ory_profile_type = profile_type # Set the authenticated Django user and profile request.profile = profile return None @staticmethod def _should_skip_auth(request: HttpRequest) -> bool: """Determine if authentication should be skipped for this request.""" skip_paths = [ '/admin/', '/health/', '/static/', ] # Allow CORS preflight requests if request.method == 'OPTIONS': return True return any(request.path.startswith(path) for path in skip_paths) @staticmethod def _get_profile_by_id(profile_id: str, expected_type: str) -> Optional[Union[TeamProfile, CustomerProfile]]: """ Get Django profile by ID and validate it matches the expected type. Returns None if not found or type mismatch. """ try: if expected_type == 'team': return TeamProfile.objects.select_related('user').get(id=profile_id) elif expected_type == 'customer': return CustomerProfile.objects.select_related('user').get(id=profile_id) else: logger.warning(f"Unknown profile type: {expected_type}") return None except (TeamProfile.DoesNotExist, CustomerProfile.DoesNotExist): logger.warning(f"Profile {profile_id} not found for type {expected_type}") return None class OryWebSocketAuthMiddleware: """ WebSocket middleware that authenticates using Ory headers from the initial HTTP upgrade request. Validates that users have the required Django profile to access the application. Security: Verifies requests came from Oathkeeper via shared secret. """ def __init__(self, app: Any) -> None: self.app = app async def __call__(self, scope: Dict[str, Any], receive: Any, send: Any) -> Any: if scope['type'] == 'websocket': # Extract headers from the initial HTTP upgrade request headers: Dict[bytes, bytes] = dict(scope.get('headers', [])) # 🔒 SECURITY: Verify request came from Oathkeeper oathkeeper_secret = headers.get(b'x-oathkeeper-secret', b'').decode('utf-8') expected_secret = settings.OATHKEEPER_SECRET if not expected_secret: logger.critical("OATHKEEPER_SECRET not configured for WebSocket!") await send({ 'type': 'websocket.close', 'code': 1011, # Internal error }) return None if oathkeeper_secret != expected_secret: logger.warning(f"Invalid Oathkeeper secret for WebSocket connection") await send({ 'type': 'websocket.close', 'code': 4403, # Forbidden }) return None # Ory headers should be passed through Oathkeeper (now safe to trust) user_id = headers.get(b'x-user-id', b'').decode('utf-8') profile_type = headers.get(b'x-user-profile-type', b'').decode('utf-8') django_profile_id = headers.get(b'x-django-profile-id', b'').decode('utf-8') if user_id and profile_type and django_profile_id: # Validate profile type if profile_type not in ['team', 'customer']: logger.warning(f"Invalid profile type for WebSocket: {profile_type}") await send({ 'type': 'websocket.close', 'code': 4403, # Custom close code for forbidden }) return None # Fetch the profile from the database profile: Optional[Union[TeamProfile, CustomerProfile]] = await self._get_profile(django_profile_id, profile_type) if profile: scope['profile'] = profile # Store Ory user information in scope for additional context scope['ory_user_id'] = user_id scope['ory_user_email'] = headers.get(b'x-user-email', b'').decode('utf-8') scope['ory_user_first_name'] = headers.get(b'x-user-first-name', b'').decode('utf-8') scope['ory_user_last_name'] = headers.get(b'x-user-last-name', b'').decode('utf-8') scope['ory_user_phone'] = headers.get(b'x-user-phone', b'').decode('utf-8') scope['ory_profile_type'] = profile_type else: # Reject connection if profile not found logger.warning(f"Profile {django_profile_id} not found for WebSocket connection") await send({ 'type': 'websocket.close', 'code': 4403, # Custom close code for forbidden }) return None else: # Reject connection if headers missing logger.warning(f"Missing Ory headers for WebSocket connection") await send({ 'type': 'websocket.close', 'code': 4401, # Custom close code for unauthorized }) return None return await self.app(scope, receive, send) @database_sync_to_async def _get_profile(self, profile_id: str, profile_type: str) -> Optional[Union[TeamProfile, CustomerProfile]]: """ Get Django profile by ID and validate it matches the expected type. Returns None if not found or type mismatch. """ try: if profile_type == 'team': return TeamProfile.objects.select_related('user').get(id=profile_id) elif profile_type == 'customer': return CustomerProfile.objects.select_related('user').get(id=profile_id) except (TeamProfile.DoesNotExist, CustomerProfile.DoesNotExist): logger.warning(f"Profile {profile_id} not found for type {profile_type}") return None return None