nexus-5/core/mcp/auth.py
2026-01-26 11:09:40 -05:00

323 lines
8.1 KiB
Python

"""
MCP Authentication and Authorization Module
Handles profile context management and role-based access control for MCP tools.
"""
import json
from enum import Enum
from functools import wraps
from typing import Any, Callable, Optional
from channels.db import database_sync_to_async
from core.models.profile import TeamProfile
class MCPContext:
"""Singleton context manager for MCP session state."""
_instance = None
_profile: Optional[TeamProfile] = None
_profile_id: Optional[str] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def set_profile(cls, profile: TeamProfile):
"""Set the active profile for this session."""
cls._profile = profile
cls._profile_id = str(profile.id)
@classmethod
def get_profile(cls) -> Optional[TeamProfile]:
"""Get the active profile."""
return cls._profile
@classmethod
def get_profile_id(cls) -> Optional[str]:
"""Get the active profile ID."""
return cls._profile_id
@classmethod
def clear(cls):
"""Clear the session context."""
cls._profile = None
cls._profile_id = None
@classmethod
def is_authenticated(cls) -> bool:
"""Check if a profile is set."""
return cls._profile is not None
class Role(str, Enum):
"""Role enumeration matching RoleChoices."""
ADMIN = "ADMIN"
TEAM_LEADER = "TEAM_LEADER"
TEAM_MEMBER = "TEAM_MEMBER"
# Role hierarchy for permission checks
ROLE_HIERARCHY = {
Role.ADMIN: 3,
Role.TEAM_LEADER: 2,
Role.TEAM_MEMBER: 1,
}
def get_role_level(role: str) -> int:
"""Get numeric level for a role."""
try:
return ROLE_HIERARCHY[Role(role)]
except (ValueError, KeyError):
return 0
async def get_team_profile(profile_id: str) -> Optional[TeamProfile]:
"""Fetch a TeamProfile by ID."""
@database_sync_to_async
def fetch():
try:
return TeamProfile.objects.get(pk=profile_id)
except TeamProfile.DoesNotExist:
return None
return await fetch()
async def set_active_profile(profile_id: str) -> TeamProfile:
"""
Set the active profile for the MCP session.
Args:
profile_id: UUID of the team profile
Returns:
The TeamProfile instance
Raises:
PermissionError: If profile not found
"""
profile = await get_team_profile(profile_id)
if not profile:
raise PermissionError(f"Profile {profile_id} not found")
MCPContext.set_profile(profile)
return profile
def require_auth(func: Callable) -> Callable:
"""Decorator to require authentication."""
@wraps(func)
async def wrapper(*args, **kwargs):
if not MCPContext.is_authenticated():
raise PermissionError(
"No active profile. Call set_active_profile first."
)
return await func(*args, **kwargs)
return wrapper
def require_role(*allowed_roles: Role) -> Callable:
"""
Decorator to require specific roles.
Usage:
@require_role(Role.ADMIN, Role.TEAM_LEADER)
async def admin_or_leader_tool():
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
profile = MCPContext.get_profile()
if not profile:
raise PermissionError(
"No active profile. Call set_active_profile first."
)
role_values = [r.value for r in allowed_roles]
if profile.role not in role_values:
raise PermissionError(
f"Access denied. Required role: {role_values}, "
f"your role: {profile.role}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
def require_minimum_role(min_role: Role) -> Callable:
"""
Decorator to require a minimum role level.
Usage:
@require_minimum_role(Role.TEAM_LEADER)
async def leader_or_above_tool():
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
profile = MCPContext.get_profile()
if not profile:
raise PermissionError(
"No active profile. Call set_active_profile first."
)
user_level = get_role_level(profile.role)
required_level = ROLE_HIERARCHY[min_role]
if user_level < required_level:
raise PermissionError(
f"Access denied. Minimum role required: {min_role.value}, "
f"your role: {profile.role}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
async def check_entity_access(
entity_type: str,
entity_id: str,
require_assignment: bool = True
) -> bool:
"""
Check if the current profile has access to an entity.
For ADMIN and TEAM_LEADER: always allowed
For TEAM_MEMBER: only if assigned (when require_assignment=True)
Args:
entity_type: 'service' or 'project'
entity_id: UUID of the entity
require_assignment: Whether to check team member assignment
Returns:
True if access is allowed
Raises:
PermissionError: If access denied
"""
profile = MCPContext.get_profile()
if not profile:
raise PermissionError("No active profile")
# Admin and Team Leader have full access
if profile.role in [Role.ADMIN.value, Role.TEAM_LEADER.value]:
return True
if not require_assignment:
return True
# Team Member must be assigned
@database_sync_to_async
def check_assignment():
if entity_type == 'service':
from core.models import Service
return Service.objects.filter(
pk=entity_id,
team_members__id=profile.id
).exists()
elif entity_type == 'project':
from core.models import Project
return Project.objects.filter(
pk=entity_id,
team_members__id=profile.id
).exists()
return False
is_assigned = await check_assignment()
if not is_assigned:
raise PermissionError(
f"Access denied. You are not assigned to this {entity_type}."
)
return True
class MockRequest:
"""Mock request object for GraphQL context."""
def __init__(self, profile: TeamProfile):
self.profile = profile
class MockContext:
"""Mock context for GraphQL execution."""
def __init__(self, profile: TeamProfile):
self.request = MockRequest(profile)
async def execute_graphql(
query: str,
variables: Optional[dict] = None,
profile: Optional[TeamProfile] = None
) -> dict:
"""
Execute a GraphQL query with profile context.
Args:
query: GraphQL query string
variables: Optional variables dict
profile: Optional profile override (uses active profile if not provided)
Returns:
GraphQL result data
Raises:
PermissionError: If no profile available
Exception: If GraphQL errors occur
"""
from core.graphql.schema import schema
if profile is None:
profile = MCPContext.get_profile()
if not profile:
raise PermissionError(
"No active profile. Call set_active_profile first."
)
context = MockContext(profile)
result = await schema.execute(
query,
variable_values=variables,
context_value=context
)
if result.errors:
error_messages = [str(e) for e in result.errors]
return {
"errors": error_messages,
"data": result.data
}
return {"data": result.data}
def json_response(data: Any, indent: int = 2) -> str:
"""Convert data to JSON string for MCP response."""
return json.dumps(data, indent=indent, default=str)
def error_response(message: str) -> str:
"""Create an error response."""
return json.dumps({"error": message}, indent=2)