323 lines
8.1 KiB
Python
323 lines
8.1 KiB
Python
"""
|
|
MCP Authentication and Authorization Module
|
|
|
|
Handles profile context management and role-based access control for MCP tools.
|
|
"""
|
|
|
|
import json
|
|
from enum import Enum
|
|
from functools import wraps
|
|
from typing import Any, Callable, Optional
|
|
from channels.db import database_sync_to_async
|
|
from core.models.profile import TeamProfile
|
|
|
|
|
|
class MCPContext:
|
|
"""Singleton context manager for MCP session state."""
|
|
|
|
_instance = None
|
|
_profile: Optional[TeamProfile] = None
|
|
_profile_id: Optional[str] = None
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
@classmethod
|
|
def set_profile(cls, profile: TeamProfile):
|
|
"""Set the active profile for this session."""
|
|
cls._profile = profile
|
|
cls._profile_id = str(profile.id)
|
|
|
|
@classmethod
|
|
def get_profile(cls) -> Optional[TeamProfile]:
|
|
"""Get the active profile."""
|
|
return cls._profile
|
|
|
|
@classmethod
|
|
def get_profile_id(cls) -> Optional[str]:
|
|
"""Get the active profile ID."""
|
|
return cls._profile_id
|
|
|
|
@classmethod
|
|
def clear(cls):
|
|
"""Clear the session context."""
|
|
cls._profile = None
|
|
cls._profile_id = None
|
|
|
|
@classmethod
|
|
def is_authenticated(cls) -> bool:
|
|
"""Check if a profile is set."""
|
|
return cls._profile is not None
|
|
|
|
|
|
class Role(str, Enum):
|
|
"""Role enumeration matching RoleChoices."""
|
|
ADMIN = "ADMIN"
|
|
TEAM_LEADER = "TEAM_LEADER"
|
|
TEAM_MEMBER = "TEAM_MEMBER"
|
|
|
|
|
|
# Role hierarchy for permission checks
|
|
ROLE_HIERARCHY = {
|
|
Role.ADMIN: 3,
|
|
Role.TEAM_LEADER: 2,
|
|
Role.TEAM_MEMBER: 1,
|
|
}
|
|
|
|
|
|
def get_role_level(role: str) -> int:
|
|
"""Get numeric level for a role."""
|
|
try:
|
|
return ROLE_HIERARCHY[Role(role)]
|
|
except (ValueError, KeyError):
|
|
return 0
|
|
|
|
|
|
async def get_team_profile(profile_id: str) -> Optional[TeamProfile]:
|
|
"""Fetch a TeamProfile by ID."""
|
|
|
|
@database_sync_to_async
|
|
def fetch():
|
|
try:
|
|
return TeamProfile.objects.get(pk=profile_id)
|
|
except TeamProfile.DoesNotExist:
|
|
return None
|
|
|
|
return await fetch()
|
|
|
|
|
|
async def set_active_profile(profile_id: str) -> TeamProfile:
|
|
"""
|
|
Set the active profile for the MCP session.
|
|
|
|
Args:
|
|
profile_id: UUID of the team profile
|
|
|
|
Returns:
|
|
The TeamProfile instance
|
|
|
|
Raises:
|
|
PermissionError: If profile not found
|
|
"""
|
|
profile = await get_team_profile(profile_id)
|
|
if not profile:
|
|
raise PermissionError(f"Profile {profile_id} not found")
|
|
|
|
MCPContext.set_profile(profile)
|
|
return profile
|
|
|
|
|
|
def require_auth(func: Callable) -> Callable:
|
|
"""Decorator to require authentication."""
|
|
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
if not MCPContext.is_authenticated():
|
|
raise PermissionError(
|
|
"No active profile. Call set_active_profile first."
|
|
)
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
def require_role(*allowed_roles: Role) -> Callable:
|
|
"""
|
|
Decorator to require specific roles.
|
|
|
|
Usage:
|
|
@require_role(Role.ADMIN, Role.TEAM_LEADER)
|
|
async def admin_or_leader_tool():
|
|
...
|
|
"""
|
|
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
raise PermissionError(
|
|
"No active profile. Call set_active_profile first."
|
|
)
|
|
|
|
role_values = [r.value for r in allowed_roles]
|
|
if profile.role not in role_values:
|
|
raise PermissionError(
|
|
f"Access denied. Required role: {role_values}, "
|
|
f"your role: {profile.role}"
|
|
)
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def require_minimum_role(min_role: Role) -> Callable:
|
|
"""
|
|
Decorator to require a minimum role level.
|
|
|
|
Usage:
|
|
@require_minimum_role(Role.TEAM_LEADER)
|
|
async def leader_or_above_tool():
|
|
...
|
|
"""
|
|
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
raise PermissionError(
|
|
"No active profile. Call set_active_profile first."
|
|
)
|
|
|
|
user_level = get_role_level(profile.role)
|
|
required_level = ROLE_HIERARCHY[min_role]
|
|
|
|
if user_level < required_level:
|
|
raise PermissionError(
|
|
f"Access denied. Minimum role required: {min_role.value}, "
|
|
f"your role: {profile.role}"
|
|
)
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
async def check_entity_access(
|
|
entity_type: str,
|
|
entity_id: str,
|
|
require_assignment: bool = True
|
|
) -> bool:
|
|
"""
|
|
Check if the current profile has access to an entity.
|
|
|
|
For ADMIN and TEAM_LEADER: always allowed
|
|
For TEAM_MEMBER: only if assigned (when require_assignment=True)
|
|
|
|
Args:
|
|
entity_type: 'service' or 'project'
|
|
entity_id: UUID of the entity
|
|
require_assignment: Whether to check team member assignment
|
|
|
|
Returns:
|
|
True if access is allowed
|
|
|
|
Raises:
|
|
PermissionError: If access denied
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
raise PermissionError("No active profile")
|
|
|
|
# Admin and Team Leader have full access
|
|
if profile.role in [Role.ADMIN.value, Role.TEAM_LEADER.value]:
|
|
return True
|
|
|
|
if not require_assignment:
|
|
return True
|
|
|
|
# Team Member must be assigned
|
|
@database_sync_to_async
|
|
def check_assignment():
|
|
if entity_type == 'service':
|
|
from core.models import Service
|
|
return Service.objects.filter(
|
|
pk=entity_id,
|
|
team_members__id=profile.id
|
|
).exists()
|
|
elif entity_type == 'project':
|
|
from core.models import Project
|
|
return Project.objects.filter(
|
|
pk=entity_id,
|
|
team_members__id=profile.id
|
|
).exists()
|
|
return False
|
|
|
|
is_assigned = await check_assignment()
|
|
if not is_assigned:
|
|
raise PermissionError(
|
|
f"Access denied. You are not assigned to this {entity_type}."
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
class MockRequest:
|
|
"""Mock request object for GraphQL context."""
|
|
|
|
def __init__(self, profile: TeamProfile):
|
|
self.profile = profile
|
|
|
|
|
|
class MockContext:
|
|
"""Mock context for GraphQL execution."""
|
|
|
|
def __init__(self, profile: TeamProfile):
|
|
self.request = MockRequest(profile)
|
|
|
|
|
|
async def execute_graphql(
|
|
query: str,
|
|
variables: Optional[dict] = None,
|
|
profile: Optional[TeamProfile] = None
|
|
) -> dict:
|
|
"""
|
|
Execute a GraphQL query with profile context.
|
|
|
|
Args:
|
|
query: GraphQL query string
|
|
variables: Optional variables dict
|
|
profile: Optional profile override (uses active profile if not provided)
|
|
|
|
Returns:
|
|
GraphQL result data
|
|
|
|
Raises:
|
|
PermissionError: If no profile available
|
|
Exception: If GraphQL errors occur
|
|
"""
|
|
from core.graphql.schema import schema
|
|
|
|
if profile is None:
|
|
profile = MCPContext.get_profile()
|
|
|
|
if not profile:
|
|
raise PermissionError(
|
|
"No active profile. Call set_active_profile first."
|
|
)
|
|
|
|
context = MockContext(profile)
|
|
|
|
result = await schema.execute(
|
|
query,
|
|
variable_values=variables,
|
|
context_value=context
|
|
)
|
|
|
|
if result.errors:
|
|
error_messages = [str(e) for e in result.errors]
|
|
return {
|
|
"errors": error_messages,
|
|
"data": result.data
|
|
}
|
|
|
|
return {"data": result.data}
|
|
|
|
|
|
def json_response(data: Any, indent: int = 2) -> str:
|
|
"""Convert data to JSON string for MCP response."""
|
|
return json.dumps(data, indent=indent, default=str)
|
|
|
|
|
|
def error_response(message: str) -> str:
|
|
"""Create an error response."""
|
|
return json.dumps({"error": message}, indent=2)
|