280 lines
7.7 KiB
Python
280 lines
7.7 KiB
Python
"""Admin tools for MCP."""
|
|
|
|
from typing import Optional
|
|
|
|
from channels.db import database_sync_to_async
|
|
|
|
from core.mcp.auth import MCPContext, Role, execute_graphql
|
|
from core.mcp.base import mcp, json_response, error_response
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_team_profiles(
|
|
role: Optional[str] = None,
|
|
include_admin: bool = False
|
|
) -> str:
|
|
"""
|
|
List all team members. Requires ADMIN or TEAM_LEADER role.
|
|
|
|
Args:
|
|
role: Optional role filter (TEAM_LEADER, TEAM_MEMBER)
|
|
include_admin: Whether to include admin profiles (default false)
|
|
|
|
Returns:
|
|
JSON array of team profile objects
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role == Role.TEAM_MEMBER.value:
|
|
return error_response("Access denied. TEAM_LEADER or ADMIN role required.")
|
|
|
|
from core.models import TeamProfile
|
|
|
|
@database_sync_to_async
|
|
def fetch():
|
|
qs = TeamProfile.objects.all()
|
|
|
|
if not include_admin:
|
|
qs = qs.exclude(role='ADMIN')
|
|
|
|
if role:
|
|
qs = qs.filter(role=role)
|
|
|
|
return [
|
|
{
|
|
"id": str(p.id),
|
|
"first_name": p.first_name,
|
|
"last_name": p.last_name,
|
|
"email": p.email,
|
|
"phone": p.phone,
|
|
"role": p.role,
|
|
"status": p.status
|
|
}
|
|
for p in qs
|
|
]
|
|
|
|
profiles = await fetch()
|
|
return json_response(profiles)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_notification_rules(is_active: Optional[bool] = None) -> str:
|
|
"""
|
|
List all notification rules. Requires ADMIN role.
|
|
|
|
Args:
|
|
is_active: Optional filter by active status
|
|
|
|
Returns:
|
|
JSON array of notification rule objects
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
query = """
|
|
query ListRules($isActive: Boolean) {
|
|
notificationRules(isActive: $isActive) {
|
|
id
|
|
name
|
|
description
|
|
eventTypes
|
|
channels
|
|
targetRoles
|
|
isActive
|
|
templateSubject
|
|
templateBody
|
|
}
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(query, {"isActive": is_active})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response(result["data"]["notificationRules"] or [])
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_notification_rule(
|
|
name: str,
|
|
event_types: str,
|
|
channels: str,
|
|
description: Optional[str] = None,
|
|
target_roles: Optional[str] = None,
|
|
target_team_profile_ids: Optional[str] = None,
|
|
template_subject: Optional[str] = None,
|
|
template_body: Optional[str] = None,
|
|
is_active: bool = True
|
|
) -> str:
|
|
"""
|
|
Create a system-wide notification rule. Requires ADMIN role.
|
|
|
|
Args:
|
|
name: Rule name
|
|
event_types: Comma-separated event types
|
|
channels: Comma-separated channels (IN_APP, EMAIL, SMS)
|
|
description: Optional description
|
|
target_roles: Comma-separated roles to notify (ADMIN, TEAM_LEADER, TEAM_MEMBER)
|
|
target_team_profile_ids: Comma-separated profile UUIDs to notify
|
|
template_subject: Subject template with {variables}
|
|
template_body: Body template with {variables}
|
|
is_active: Whether rule is active (default true)
|
|
|
|
Returns:
|
|
JSON object with created rule
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
mutation = """
|
|
mutation CreateRule($input: NotificationRuleInput!) {
|
|
createNotificationRule(input: $input) {
|
|
id
|
|
name
|
|
eventTypes
|
|
channels
|
|
targetRoles
|
|
isActive
|
|
}
|
|
}
|
|
"""
|
|
|
|
input_data = {
|
|
"name": name,
|
|
"eventTypes": [e.strip() for e in event_types.split(",")],
|
|
"channels": [c.strip() for c in channels.split(",")],
|
|
"description": description,
|
|
"templateSubject": template_subject,
|
|
"templateBody": template_body,
|
|
"isActive": is_active
|
|
}
|
|
|
|
if target_roles:
|
|
input_data["targetRoles"] = [r.strip() for r in target_roles.split(",")]
|
|
|
|
if target_team_profile_ids:
|
|
input_data["targetTeamProfileIds"] = [
|
|
pid.strip() for pid in target_team_profile_ids.split(",")
|
|
]
|
|
|
|
result = await execute_graphql(mutation, {"input": input_data})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"rule": result["data"]["createNotificationRule"]
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
async def update_notification_rule(
|
|
rule_id: str,
|
|
name: Optional[str] = None,
|
|
event_types: Optional[str] = None,
|
|
channels: Optional[str] = None,
|
|
target_roles: Optional[str] = None,
|
|
is_active: Optional[bool] = None
|
|
) -> str:
|
|
"""
|
|
Update a notification rule. Requires ADMIN role.
|
|
|
|
Args:
|
|
rule_id: UUID of the rule to update
|
|
name: New rule name
|
|
event_types: Comma-separated event types
|
|
channels: Comma-separated channels
|
|
target_roles: Comma-separated roles
|
|
is_active: Whether rule is active
|
|
|
|
Returns:
|
|
JSON object with updated rule
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
mutation = """
|
|
mutation UpdateRule($input: NotificationRuleUpdateInput!) {
|
|
updateNotificationRule(input: $input) {
|
|
id
|
|
name
|
|
eventTypes
|
|
channels
|
|
isActive
|
|
}
|
|
}
|
|
"""
|
|
|
|
input_data = {"id": rule_id}
|
|
if name:
|
|
input_data["name"] = name
|
|
if event_types:
|
|
input_data["eventTypes"] = [e.strip() for e in event_types.split(",")]
|
|
if channels:
|
|
input_data["channels"] = [c.strip() for c in channels.split(",")]
|
|
if target_roles:
|
|
input_data["targetRoles"] = [r.strip() for r in target_roles.split(",")]
|
|
if is_active is not None:
|
|
input_data["isActive"] = is_active
|
|
|
|
result = await execute_graphql(mutation, {"input": input_data})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"rule": result["data"]["updateNotificationRule"]
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_notification_rule(rule_id: str) -> str:
|
|
"""
|
|
Delete a notification rule. Requires ADMIN role.
|
|
|
|
Args:
|
|
rule_id: UUID of the rule to delete
|
|
|
|
Returns:
|
|
JSON object confirming deletion
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
mutation = """
|
|
mutation DeleteRule($id: ID!) {
|
|
deleteNotificationRule(id: $id)
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(mutation, {"id": rule_id})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"deleted": result["data"]["deleteNotificationRule"]
|
|
})
|