258 lines
6.8 KiB
Python
258 lines
6.8 KiB
Python
"""Notification tools for MCP."""
|
|
|
|
from typing import Optional
|
|
|
|
from channels.db import database_sync_to_async
|
|
|
|
from core.mcp.auth import MCPContext, Role, execute_graphql
|
|
from core.mcp.base import mcp, json_response, error_response, logger
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_my_notifications(
|
|
unread_only: bool = False,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> str:
|
|
"""
|
|
Get your notifications.
|
|
|
|
Args:
|
|
unread_only: If true, only return unread notifications
|
|
limit: Maximum notifications to return (default 50)
|
|
offset: Pagination offset
|
|
|
|
Returns:
|
|
JSON array of notification objects
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
query = """
|
|
query GetNotifications($unreadOnly: Boolean, $limit: Int, $offset: Int) {
|
|
myNotifications(unreadOnly: $unreadOnly, limit: $limit, offset: $offset) {
|
|
id
|
|
subject
|
|
body
|
|
status
|
|
actionUrl
|
|
readAt
|
|
createdAt
|
|
event {
|
|
eventType
|
|
entityType
|
|
entityId
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(query, {
|
|
"unreadOnly": unread_only,
|
|
"limit": limit,
|
|
"offset": offset
|
|
})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response(result["data"]["myNotifications"] or [])
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_unread_notification_count() -> str:
|
|
"""
|
|
Get the count of unread notifications.
|
|
|
|
Returns:
|
|
JSON object with unread count
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
query = """
|
|
query GetUnreadCount {
|
|
myUnreadNotificationCount
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(query)
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({"unread_count": result["data"]["myUnreadNotificationCount"]})
|
|
|
|
|
|
@mcp.tool()
|
|
async def mark_notification_read(notification_id: str) -> str:
|
|
"""
|
|
Mark a notification as read.
|
|
|
|
Args:
|
|
notification_id: UUID of the notification
|
|
|
|
Returns:
|
|
JSON object confirming the notification was marked read
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
mutation = """
|
|
mutation MarkRead($id: ID!) {
|
|
markNotificationAsRead(id: $id) {
|
|
id
|
|
status
|
|
readAt
|
|
}
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(mutation, {"id": notification_id})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"notification": result["data"]["markNotificationAsRead"]
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
async def mark_all_notifications_read() -> str:
|
|
"""
|
|
Mark all unread notifications as read.
|
|
|
|
Returns:
|
|
JSON object with count of notifications marked read
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
mutation = """
|
|
mutation MarkAllRead {
|
|
markAllNotificationsAsRead
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(mutation)
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"count_marked": result["data"]["markAllNotificationsAsRead"]
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_my_notification_rules() -> str:
|
|
"""
|
|
List notification rules that apply to you.
|
|
- ADMIN: See all rules
|
|
- Others: See rules targeting their role or profile
|
|
|
|
Returns:
|
|
JSON array of notification rule objects
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
from core.models import NotificationRule
|
|
|
|
@database_sync_to_async
|
|
def fetch():
|
|
if profile.role == Role.ADMIN.value:
|
|
qs = NotificationRule.objects.filter(is_active=True)
|
|
else:
|
|
from django.db.models import Q
|
|
qs = NotificationRule.objects.filter(
|
|
Q(is_active=True) & (
|
|
Q(target_roles__contains=[profile.role]) |
|
|
Q(target_team_profiles__id=profile.id)
|
|
)
|
|
).distinct()
|
|
|
|
return [
|
|
{
|
|
"id": str(r.id),
|
|
"name": r.name,
|
|
"description": r.description,
|
|
"event_types": r.event_types,
|
|
"channels": r.channels,
|
|
"target_roles": r.target_roles,
|
|
"is_personal": r.target_team_profiles.filter(id=profile.id).exists()
|
|
}
|
|
for r in qs
|
|
]
|
|
|
|
rules = await fetch()
|
|
return json_response(rules)
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_personal_notification_rule(
|
|
name: str,
|
|
event_types: str,
|
|
channels: str,
|
|
description: Optional[str] = None,
|
|
template_subject: Optional[str] = None,
|
|
template_body: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Create a personal notification rule (scoped to yourself only).
|
|
|
|
Args:
|
|
name: Rule name
|
|
event_types: Comma-separated event types (e.g., 'SERVICE_COMPLETED,PROJECT_COMPLETED')
|
|
channels: Comma-separated channels (IN_APP, EMAIL, SMS)
|
|
description: Optional description
|
|
template_subject: Optional subject template
|
|
template_body: Optional body template
|
|
|
|
Returns:
|
|
JSON object with created rule
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
from core.models import NotificationRule
|
|
|
|
event_list = [e.strip() for e in event_types.split(",")]
|
|
channel_list = [c.strip() for c in channels.split(",")]
|
|
|
|
@database_sync_to_async
|
|
def create():
|
|
rule = NotificationRule.objects.create(
|
|
name=name,
|
|
description=description or "",
|
|
event_types=event_list,
|
|
channels=channel_list,
|
|
template_subject=template_subject or "",
|
|
template_body=template_body or "",
|
|
is_active=True
|
|
)
|
|
rule.target_team_profiles.add(profile)
|
|
return {
|
|
"id": str(rule.id),
|
|
"name": rule.name,
|
|
"event_types": rule.event_types,
|
|
"channels": rule.channels,
|
|
"is_personal": True
|
|
}
|
|
|
|
try:
|
|
rule = await create()
|
|
return json_response({"success": True, "rule": rule})
|
|
except Exception as e:
|
|
logger.error(f"Error creating notification rule: {e}")
|
|
return error_response(str(e))
|