nexus-5/core/chat/service.py
2026-01-26 11:09:40 -05:00

628 lines
26 KiB
Python

"""
Chat service that integrates Claude with Nexus MCP tools.
"""
import json
import logging
from typing import AsyncGenerator, Optional, List, Dict, Any
import anthropic
from django.conf import settings
from core.models import TeamProfile
from core.models.chat import ChatConversation, ChatMessage
logger = logging.getLogger(__name__)
# Tools that require confirmation before execution
DESTRUCTIVE_ACTIONS = {
'delete_service',
'delete_project',
'create_services_bulk',
}
# System prompt for the assistant
SYSTEM_PROMPT = """You are a helpful assistant for Nexus, a field service management system used by your organization.
You have access to tools to query and manage:
- Customers and their accounts
- Services (scheduled cleaning visits)
- Projects (one-time work)
- Team member schedules
- Session tracking and task completion
- Notifications
Be concise and helpful. When asked about data, use the appropriate tools to fetch current information.
When performing destructive actions like deletion or bulk creation, clearly confirm what will be affected.
Format responses in markdown when appropriate for better readability."""
def get_mcp_tools() -> List[Dict[str, Any]]:
"""
Get the list of MCP tools as Anthropic tool definitions.
"""
# Import here to avoid circular imports
from core.mcp.tools.auth import set_active_profile, get_my_profile
from core.mcp.tools.dashboard import get_my_schedule, get_system_stats
from core.mcp.tools.customers import list_customers, get_customer, list_accounts, get_account
from core.mcp.tools.services import list_services, get_service, create_service, update_service, delete_service, create_services_bulk
from core.mcp.tools.projects import list_projects, get_project, create_project, update_project, delete_project
from core.mcp.tools.sessions import get_active_session, open_session, close_session, revert_session, add_task_completion, remove_task_completion
from core.mcp.tools.notifications import get_my_notifications, get_unread_notification_count, mark_notification_read, mark_all_notifications_read
# Map function to tool definition
tools = [
# Dashboard
{
"name": "get_my_schedule",
"description": "Get your assigned services and projects for a date range.",
"input_schema": {
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "Start date in YYYY-MM-DD format"},
"end_date": {"type": "string", "description": "End date in YYYY-MM-DD format"},
"status": {"type": "string", "description": "Optional status filter"}
}
}
},
{
"name": "get_system_stats",
"description": "Get high-level system statistics. Requires ADMIN or TEAM_LEADER role.",
"input_schema": {"type": "object", "properties": {}}
},
# Customers
{
"name": "list_customers",
"description": "List customers with optional filtering. Requires ADMIN or TEAM_LEADER role.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum customers to return (default 25)"},
"search": {"type": "string", "description": "Search term for customer name"},
"is_active": {"type": "boolean", "description": "Filter by active status"}
}
}
},
{
"name": "get_customer",
"description": "Get detailed customer information including accounts.",
"input_schema": {
"type": "object",
"properties": {
"customer_id": {"type": "string", "description": "UUID of the customer"}
},
"required": ["customer_id"]
}
},
{
"name": "list_accounts",
"description": "List accounts with optional filtering.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum accounts to return"},
"customer_id": {"type": "string", "description": "Filter by customer UUID"},
"search": {"type": "string", "description": "Search term"},
"is_active": {"type": "boolean", "description": "Filter by active status"}
}
}
},
{
"name": "get_account",
"description": "Get detailed account information.",
"input_schema": {
"type": "object",
"properties": {
"account_id": {"type": "string", "description": "UUID of the account"}
},
"required": ["account_id"]
}
},
# Services
{
"name": "list_services",
"description": "List services with optional filters.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum services to return"},
"customer_id": {"type": "string", "description": "Filter by customer UUID"},
"account_id": {"type": "string", "description": "Filter by account UUID"},
"status": {"type": "string", "description": "Status filter (SCHEDULED, IN_PROGRESS, COMPLETED, CANCELLED)"},
"date": {"type": "string", "description": "Exact date in YYYY-MM-DD format"},
"start_date": {"type": "string", "description": "Range start date"},
"end_date": {"type": "string", "description": "Range end date"}
}
}
},
{
"name": "get_service",
"description": "Get detailed service information including scope and tasks.",
"input_schema": {
"type": "object",
"properties": {
"service_id": {"type": "string", "description": "UUID of the service"}
},
"required": ["service_id"]
}
},
{
"name": "create_service",
"description": "Create a new service. Requires ADMIN role.",
"input_schema": {
"type": "object",
"properties": {
"account_address_id": {"type": "string", "description": "UUID of the account address"},
"date": {"type": "string", "description": "Service date in YYYY-MM-DD format"},
"status": {"type": "string", "description": "Status (default SCHEDULED)"},
"team_member_ids": {"type": "string", "description": "Comma-separated team member UUIDs"},
"notes": {"type": "string", "description": "Optional notes"}
},
"required": ["account_address_id", "date"]
}
},
{
"name": "update_service",
"description": "Update an existing service. Requires ADMIN role.",
"input_schema": {
"type": "object",
"properties": {
"service_id": {"type": "string", "description": "UUID of the service"},
"date": {"type": "string", "description": "New date"},
"status": {"type": "string", "description": "New status"},
"team_member_ids": {"type": "string", "description": "Comma-separated team member UUIDs"},
"notes": {"type": "string", "description": "Updated notes"}
},
"required": ["service_id"]
}
},
{
"name": "delete_service",
"description": "Delete a service. Requires ADMIN role. WARNING: This is destructive.",
"input_schema": {
"type": "object",
"properties": {
"service_id": {"type": "string", "description": "UUID of the service to delete"}
},
"required": ["service_id"]
}
},
{
"name": "create_services_bulk",
"description": "Create multiple services at once. Requires ADMIN role. Max 500 services.",
"input_schema": {
"type": "object",
"properties": {
"services_json": {"type": "string", "description": "JSON array of service objects with account_address_id, date, status, notes"}
},
"required": ["services_json"]
}
},
# Projects
{
"name": "list_projects",
"description": "List projects with optional filters.",
"input_schema": {
"type": "object",
"properties": {
"limit": {"type": "integer", "description": "Maximum projects to return"},
"customer_id": {"type": "string", "description": "Filter by customer UUID"},
"status": {"type": "string", "description": "Status filter"},
"date": {"type": "string", "description": "Exact date"},
"start_date": {"type": "string", "description": "Range start"},
"end_date": {"type": "string", "description": "Range end"}
}
}
},
{
"name": "get_project",
"description": "Get detailed project information.",
"input_schema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "UUID of the project"}
},
"required": ["project_id"]
}
},
{
"name": "create_project",
"description": "Create a new project. Requires ADMIN role.",
"input_schema": {
"type": "object",
"properties": {
"customer_id": {"type": "string", "description": "UUID of the customer"},
"name": {"type": "string", "description": "Project name"},
"date": {"type": "string", "description": "Project date"},
"labor": {"type": "number", "description": "Labor cost"},
"amount": {"type": "number", "description": "Total amount"},
"account_address_id": {"type": "string", "description": "UUID of account address"},
"street_address": {"type": "string", "description": "Freeform street address"},
"city": {"type": "string", "description": "City"},
"state": {"type": "string", "description": "State"},
"zip_code": {"type": "string", "description": "Zip code"},
"team_member_ids": {"type": "string", "description": "Comma-separated UUIDs"},
"notes": {"type": "string", "description": "Notes"}
},
"required": ["customer_id", "name", "date", "labor"]
}
},
{
"name": "update_project",
"description": "Update an existing project. Requires ADMIN role.",
"input_schema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "UUID of the project"},
"name": {"type": "string"},
"date": {"type": "string"},
"status": {"type": "string"},
"labor": {"type": "number"},
"amount": {"type": "number"},
"team_member_ids": {"type": "string"},
"notes": {"type": "string"}
},
"required": ["project_id"]
}
},
{
"name": "delete_project",
"description": "Delete a project. Requires ADMIN role. WARNING: This is destructive.",
"input_schema": {
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "UUID of the project to delete"}
},
"required": ["project_id"]
}
},
# Sessions
{
"name": "get_active_session",
"description": "Get the active session for a service or project.",
"input_schema": {
"type": "object",
"properties": {
"entity_type": {"type": "string", "description": "Either 'service' or 'project'"},
"entity_id": {"type": "string", "description": "UUID of the service or project"}
},
"required": ["entity_type", "entity_id"]
}
},
{
"name": "open_session",
"description": "Start a work session for a service or project.",
"input_schema": {
"type": "object",
"properties": {
"entity_type": {"type": "string", "description": "Either 'service' or 'project'"},
"entity_id": {"type": "string", "description": "UUID"}
},
"required": ["entity_type", "entity_id"]
}
},
{
"name": "close_session",
"description": "Complete a work session and mark tasks as done.",
"input_schema": {
"type": "object",
"properties": {
"entity_type": {"type": "string"},
"entity_id": {"type": "string"},
"completed_task_ids": {"type": "string", "description": "Comma-separated task UUIDs"}
},
"required": ["entity_type", "entity_id"]
}
},
# Notifications
{
"name": "get_my_notifications",
"description": "Get your notifications.",
"input_schema": {
"type": "object",
"properties": {
"unread_only": {"type": "boolean"},
"limit": {"type": "integer"}
}
}
},
{
"name": "get_unread_notification_count",
"description": "Get count of unread notifications.",
"input_schema": {"type": "object", "properties": {}}
},
{
"name": "mark_all_notifications_read",
"description": "Mark all notifications as read.",
"input_schema": {"type": "object", "properties": {}}
}
]
return tools
async def execute_tool(tool_name: str, tool_input: Dict[str, Any], profile: TeamProfile) -> str:
"""
Execute an MCP tool and return the result as a string.
"""
# Import tool functions
from core.mcp.tools import dashboard, customers, services, projects, sessions, notifications
from core.mcp.auth import MCPContext
# Set the active profile for the MCP context
MCPContext.set_profile(profile)
# Map tool names to functions
tool_map = {
# Dashboard
"get_my_schedule": dashboard.get_my_schedule,
"get_system_stats": dashboard.get_system_stats,
# Customers
"list_customers": customers.list_customers,
"get_customer": customers.get_customer,
"list_accounts": customers.list_accounts,
"get_account": customers.get_account,
# Services
"list_services": services.list_services,
"get_service": services.get_service,
"create_service": services.create_service,
"update_service": services.update_service,
"delete_service": services.delete_service,
"create_services_bulk": services.create_services_bulk,
# Projects
"list_projects": projects.list_projects,
"get_project": projects.get_project,
"create_project": projects.create_project,
"update_project": projects.update_project,
"delete_project": projects.delete_project,
# Sessions
"get_active_session": sessions.get_active_session,
"open_session": sessions.open_session,
"close_session": sessions.close_session,
"revert_session": sessions.revert_session,
"add_task_completion": sessions.add_task_completion,
"remove_task_completion": sessions.remove_task_completion,
# Notifications
"get_my_notifications": notifications.get_my_notifications,
"get_unread_notification_count": notifications.get_unread_notification_count,
"mark_notification_read": notifications.mark_notification_read,
"mark_all_notifications_read": notifications.mark_all_notifications_read,
}
func = tool_map.get(tool_name)
if not func:
return json.dumps({"error": f"Unknown tool: {tool_name}"})
try:
result = await func(**tool_input)
return result
except Exception as e:
logger.exception(f"Error executing tool {tool_name}")
return json.dumps({"error": str(e)})
class ChatService:
"""
Service for handling chat conversations with Claude.
"""
def __init__(self, profile: TeamProfile):
self.profile = profile
self.client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
self.tools = get_mcp_tools()
async def get_or_create_conversation(self, conversation_id: Optional[str] = None) -> ChatConversation:
"""Get existing conversation or create a new one."""
from channels.db import database_sync_to_async
if conversation_id:
@database_sync_to_async
def get_conv():
return ChatConversation.objects.filter(
id=conversation_id,
team_profile=self.profile,
is_active=True
).first()
conv = await get_conv()
if conv:
return conv
# Create new conversation
@database_sync_to_async
def create_conv():
return ChatConversation.objects.create(
team_profile=self.profile,
title=""
)
return await create_conv()
async def get_conversation_messages(self, conversation: ChatConversation) -> List[Dict[str, Any]]:
"""Get message history for Claude API format."""
from channels.db import database_sync_to_async
@database_sync_to_async
def fetch_messages():
messages = []
for msg in conversation.messages.all().order_by('created_at'):
messages.append({
"role": msg.role,
"content": msg.content
})
return messages
return await fetch_messages()
async def save_message(
self,
conversation: ChatConversation,
role: str,
content: str,
tool_calls: Optional[List] = None,
tool_results: Optional[List] = None
) -> ChatMessage:
"""Save a message to the conversation."""
from channels.db import database_sync_to_async
@database_sync_to_async
def create_message():
msg = ChatMessage.objects.create(
conversation=conversation,
role=role,
content=content,
tool_calls=tool_calls or [],
tool_results=tool_results or []
)
# Update conversation title if first user message
if role == 'user' and not conversation.title:
conversation.title = content[:50] + ('...' if len(content) > 50 else '')
conversation.save(update_fields=['title', 'updated_at'])
return msg
return await create_message()
async def stream_response(
self,
conversation: ChatConversation,
user_message: str
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Stream a response from Claude, handling tool calls.
Yields events:
- {"type": "message_start", "conversation_id": str}
- {"type": "text", "content": str}
- {"type": "tool_call", "tool": str, "input": dict}
- {"type": "tool_result", "tool": str, "result": str}
- {"type": "message_end", "message_id": str}
- {"type": "error", "error": str}
"""
# Save user message
await self.save_message(conversation, 'user', user_message)
# Get conversation history
messages = await self.get_conversation_messages(conversation)
yield {"type": "message_start", "conversation_id": str(conversation.id)}
try:
full_response = ""
tool_calls = []
tool_results = []
# Keep processing until we get a final response (no more tool calls)
while True:
# Create message with streaming
async with self.client.messages.stream(
model=settings.ANTHROPIC_MODEL,
max_tokens=4096,
system=SYSTEM_PROMPT,
messages=messages,
tools=self.tools,
) as stream:
current_tool_use = None
current_tool_input = ""
async for event in stream:
if event.type == "content_block_start":
if event.content_block.type == "tool_use":
current_tool_use = {
"id": event.content_block.id,
"name": event.content_block.name,
}
current_tool_input = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
full_response += event.delta.text
yield {"type": "text", "content": event.delta.text}
elif event.delta.type == "input_json_delta":
current_tool_input += event.delta.partial_json
elif event.type == "content_block_stop":
if current_tool_use:
try:
tool_input = json.loads(current_tool_input) if current_tool_input else {}
except json.JSONDecodeError:
tool_input = {}
current_tool_use["input"] = tool_input
tool_calls.append(current_tool_use)
yield {
"type": "tool_call",
"id": current_tool_use["id"],
"tool": current_tool_use["name"],
"input": tool_input,
"requires_confirmation": current_tool_use["name"] in DESTRUCTIVE_ACTIONS
}
current_tool_use = None
current_tool_input = ""
# Get the final message to check stop reason
final_message = await stream.get_final_message()
# If there are tool calls, execute them and continue
if final_message.stop_reason == "tool_use":
# Execute each tool call
tool_use_results = []
for tool_call in tool_calls:
if tool_call not in [t for t in tool_use_results]:
result = await execute_tool(
tool_call["name"],
tool_call["input"],
self.profile
)
tool_results.append({
"id": tool_call["id"],
"tool": tool_call["name"],
"result": result
})
yield {
"type": "tool_result",
"id": tool_call["id"],
"tool": tool_call["name"],
"result": result
}
tool_use_results.append({
"type": "tool_result",
"tool_use_id": tool_call["id"],
"content": result
})
# Add assistant message with tool use and tool results to continue conversation
messages.append({
"role": "assistant",
"content": final_message.content
})
messages.append({
"role": "user",
"content": tool_use_results
})
# Clear tool calls for next iteration
tool_calls = []
else:
# No more tool calls, we're done
break
# Save assistant message
msg = await self.save_message(
conversation,
'assistant',
full_response,
tool_calls=tool_calls,
tool_results=tool_results
)
yield {"type": "message_end", "message_id": str(msg.id)}
except Exception as e:
logger.exception("Error streaming response")
yield {"type": "error", "error": str(e)}