nexus-5/core/mcp/tools/notifications.py
2026-01-26 11:09:40 -05:00

258 lines
6.8 KiB
Python

"""Notification tools for MCP."""
from typing import Optional
from channels.db import database_sync_to_async
from core.mcp.auth import MCPContext, Role, execute_graphql
from core.mcp.base import mcp, json_response, error_response, logger
@mcp.tool()
async def get_my_notifications(
unread_only: bool = False,
limit: int = 50,
offset: int = 0
) -> str:
"""
Get your notifications.
Args:
unread_only: If true, only return unread notifications
limit: Maximum notifications to return (default 50)
offset: Pagination offset
Returns:
JSON array of notification objects
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
query = """
query GetNotifications($unreadOnly: Boolean, $limit: Int, $offset: Int) {
myNotifications(unreadOnly: $unreadOnly, limit: $limit, offset: $offset) {
id
subject
body
status
actionUrl
readAt
createdAt
event {
eventType
entityType
entityId
}
}
}
"""
result = await execute_graphql(query, {
"unreadOnly": unread_only,
"limit": limit,
"offset": offset
})
if "errors" in result:
return json_response(result)
return json_response(result["data"]["myNotifications"] or [])
@mcp.tool()
async def get_unread_notification_count() -> str:
"""
Get the count of unread notifications.
Returns:
JSON object with unread count
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
query = """
query GetUnreadCount {
myUnreadNotificationCount
}
"""
result = await execute_graphql(query)
if "errors" in result:
return json_response(result)
return json_response({"unread_count": result["data"]["myUnreadNotificationCount"]})
@mcp.tool()
async def mark_notification_read(notification_id: str) -> str:
"""
Mark a notification as read.
Args:
notification_id: UUID of the notification
Returns:
JSON object confirming the notification was marked read
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
mutation = """
mutation MarkRead($id: ID!) {
markNotificationAsRead(id: $id) {
id
status
readAt
}
}
"""
result = await execute_graphql(mutation, {"id": notification_id})
if "errors" in result:
return json_response(result)
return json_response({
"success": True,
"notification": result["data"]["markNotificationAsRead"]
})
@mcp.tool()
async def mark_all_notifications_read() -> str:
"""
Mark all unread notifications as read.
Returns:
JSON object with count of notifications marked read
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
mutation = """
mutation MarkAllRead {
markAllNotificationsAsRead
}
"""
result = await execute_graphql(mutation)
if "errors" in result:
return json_response(result)
return json_response({
"success": True,
"count_marked": result["data"]["markAllNotificationsAsRead"]
})
@mcp.tool()
async def list_my_notification_rules() -> str:
"""
List notification rules that apply to you.
- ADMIN: See all rules
- Others: See rules targeting their role or profile
Returns:
JSON array of notification rule objects
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
from core.models import NotificationRule
@database_sync_to_async
def fetch():
if profile.role == Role.ADMIN.value:
qs = NotificationRule.objects.filter(is_active=True)
else:
from django.db.models import Q
qs = NotificationRule.objects.filter(
Q(is_active=True) & (
Q(target_roles__contains=[profile.role]) |
Q(target_team_profiles__id=profile.id)
)
).distinct()
return [
{
"id": str(r.id),
"name": r.name,
"description": r.description,
"event_types": r.event_types,
"channels": r.channels,
"target_roles": r.target_roles,
"is_personal": r.target_team_profiles.filter(id=profile.id).exists()
}
for r in qs
]
rules = await fetch()
return json_response(rules)
@mcp.tool()
async def create_personal_notification_rule(
name: str,
event_types: str,
channels: str,
description: Optional[str] = None,
template_subject: Optional[str] = None,
template_body: Optional[str] = None
) -> str:
"""
Create a personal notification rule (scoped to yourself only).
Args:
name: Rule name
event_types: Comma-separated event types (e.g., 'SERVICE_COMPLETED,PROJECT_COMPLETED')
channels: Comma-separated channels (IN_APP, EMAIL, SMS)
description: Optional description
template_subject: Optional subject template
template_body: Optional body template
Returns:
JSON object with created rule
"""
profile = MCPContext.get_profile()
if not profile:
return error_response("No active profile. Call set_active_profile first.")
from core.models import NotificationRule
event_list = [e.strip() for e in event_types.split(",")]
channel_list = [c.strip() for c in channels.split(",")]
@database_sync_to_async
def create():
rule = NotificationRule.objects.create(
name=name,
description=description or "",
event_types=event_list,
channels=channel_list,
template_subject=template_subject or "",
template_body=template_body or "",
is_active=True
)
rule.target_team_profiles.add(profile)
return {
"id": str(rule.id),
"name": rule.name,
"event_types": rule.event_types,
"channels": rule.channels,
"is_personal": True
}
try:
rule = await create()
return json_response({"success": True, "rule": rule})
except Exception as e:
logger.error(f"Error creating notification rule: {e}")
return error_response(str(e))