""" MCP Authentication and Authorization Module Handles profile context management and role-based access control for MCP tools. """ import json from enum import Enum from functools import wraps from typing import Any, Callable, Optional from channels.db import database_sync_to_async from core.models.profile import TeamProfile class MCPContext: """Singleton context manager for MCP session state.""" _instance = None _profile: Optional[TeamProfile] = None _profile_id: Optional[str] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod def set_profile(cls, profile: TeamProfile): """Set the active profile for this session.""" cls._profile = profile cls._profile_id = str(profile.id) @classmethod def get_profile(cls) -> Optional[TeamProfile]: """Get the active profile.""" return cls._profile @classmethod def get_profile_id(cls) -> Optional[str]: """Get the active profile ID.""" return cls._profile_id @classmethod def clear(cls): """Clear the session context.""" cls._profile = None cls._profile_id = None @classmethod def is_authenticated(cls) -> bool: """Check if a profile is set.""" return cls._profile is not None class Role(str, Enum): """Role enumeration matching RoleChoices.""" ADMIN = "ADMIN" TEAM_LEADER = "TEAM_LEADER" TEAM_MEMBER = "TEAM_MEMBER" # Role hierarchy for permission checks ROLE_HIERARCHY = { Role.ADMIN: 3, Role.TEAM_LEADER: 2, Role.TEAM_MEMBER: 1, } def get_role_level(role: str) -> int: """Get numeric level for a role.""" try: return ROLE_HIERARCHY[Role(role)] except (ValueError, KeyError): return 0 async def get_team_profile(profile_id: str) -> Optional[TeamProfile]: """Fetch a TeamProfile by ID.""" @database_sync_to_async def fetch(): try: return TeamProfile.objects.get(pk=profile_id) except TeamProfile.DoesNotExist: return None return await fetch() async def set_active_profile(profile_id: str) -> TeamProfile: """ Set the active profile for the MCP session. Args: profile_id: UUID of the team profile Returns: The TeamProfile instance Raises: PermissionError: If profile not found """ profile = await get_team_profile(profile_id) if not profile: raise PermissionError(f"Profile {profile_id} not found") MCPContext.set_profile(profile) return profile def require_auth(func: Callable) -> Callable: """Decorator to require authentication.""" @wraps(func) async def wrapper(*args, **kwargs): if not MCPContext.is_authenticated(): raise PermissionError( "No active profile. Call set_active_profile first." ) return await func(*args, **kwargs) return wrapper def require_role(*allowed_roles: Role) -> Callable: """ Decorator to require specific roles. Usage: @require_role(Role.ADMIN, Role.TEAM_LEADER) async def admin_or_leader_tool(): ... """ def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): profile = MCPContext.get_profile() if not profile: raise PermissionError( "No active profile. Call set_active_profile first." ) role_values = [r.value for r in allowed_roles] if profile.role not in role_values: raise PermissionError( f"Access denied. Required role: {role_values}, " f"your role: {profile.role}" ) return await func(*args, **kwargs) return wrapper return decorator def require_minimum_role(min_role: Role) -> Callable: """ Decorator to require a minimum role level. Usage: @require_minimum_role(Role.TEAM_LEADER) async def leader_or_above_tool(): ... """ def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): profile = MCPContext.get_profile() if not profile: raise PermissionError( "No active profile. Call set_active_profile first." ) user_level = get_role_level(profile.role) required_level = ROLE_HIERARCHY[min_role] if user_level < required_level: raise PermissionError( f"Access denied. Minimum role required: {min_role.value}, " f"your role: {profile.role}" ) return await func(*args, **kwargs) return wrapper return decorator async def check_entity_access( entity_type: str, entity_id: str, require_assignment: bool = True ) -> bool: """ Check if the current profile has access to an entity. For ADMIN and TEAM_LEADER: always allowed For TEAM_MEMBER: only if assigned (when require_assignment=True) Args: entity_type: 'service' or 'project' entity_id: UUID of the entity require_assignment: Whether to check team member assignment Returns: True if access is allowed Raises: PermissionError: If access denied """ profile = MCPContext.get_profile() if not profile: raise PermissionError("No active profile") # Admin and Team Leader have full access if profile.role in [Role.ADMIN.value, Role.TEAM_LEADER.value]: return True if not require_assignment: return True # Team Member must be assigned @database_sync_to_async def check_assignment(): if entity_type == 'service': from core.models import Service return Service.objects.filter( pk=entity_id, team_members__id=profile.id ).exists() elif entity_type == 'project': from core.models import Project return Project.objects.filter( pk=entity_id, team_members__id=profile.id ).exists() return False is_assigned = await check_assignment() if not is_assigned: raise PermissionError( f"Access denied. You are not assigned to this {entity_type}." ) return True class MockRequest: """Mock request object for GraphQL context.""" def __init__(self, profile: TeamProfile): self.profile = profile class MockContext: """Mock context for GraphQL execution.""" def __init__(self, profile: TeamProfile): self.request = MockRequest(profile) async def execute_graphql( query: str, variables: Optional[dict] = None, profile: Optional[TeamProfile] = None ) -> dict: """ Execute a GraphQL query with profile context. Args: query: GraphQL query string variables: Optional variables dict profile: Optional profile override (uses active profile if not provided) Returns: GraphQL result data Raises: PermissionError: If no profile available Exception: If GraphQL errors occur """ from core.graphql.schema import schema if profile is None: profile = MCPContext.get_profile() if not profile: raise PermissionError( "No active profile. Call set_active_profile first." ) context = MockContext(profile) result = await schema.execute( query, variable_values=variables, context_value=context ) if result.errors: error_messages = [str(e) for e in result.errors] return { "errors": error_messages, "data": result.data } return {"data": result.data} def json_response(data: Any, indent: int = 2) -> str: """Convert data to JSON string for MCP response.""" return json.dumps(data, indent=indent, default=str) def error_response(message: str) -> str: """Create an error response.""" return json.dumps({"error": message}, indent=2)