495 lines
16 KiB
Python
495 lines
16 KiB
Python
"""Service tools for MCP."""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from channels.db import database_sync_to_async
|
|
|
|
from core.mcp.auth import MCPContext, Role, execute_graphql
|
|
from core.mcp.base import mcp, json_response, error_response
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_services(
|
|
limit: int = 25,
|
|
customer_id: Optional[str] = None,
|
|
account_id: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
date: Optional[str] = None,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
List services with optional filters.
|
|
- ADMIN/TEAM_LEADER: See all services
|
|
- TEAM_MEMBER: See only assigned services
|
|
|
|
Args:
|
|
limit: Maximum services to return (default 25)
|
|
customer_id: Optional customer UUID to filter by
|
|
account_id: Optional account UUID to filter by
|
|
status: Optional status filter (SCHEDULED, IN_PROGRESS, COMPLETED, CANCELLED)
|
|
date: Optional exact date in YYYY-MM-DD format
|
|
start_date: Optional range start date
|
|
end_date: Optional range end date
|
|
|
|
Returns:
|
|
JSON array of service objects with full context
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
from core.models import Service
|
|
|
|
@database_sync_to_async
|
|
def fetch():
|
|
qs = Service.objects.select_related(
|
|
'account_address__account__customer'
|
|
).prefetch_related('team_members')
|
|
|
|
if profile.role == Role.TEAM_MEMBER.value:
|
|
qs = qs.filter(team_members__id=profile.id)
|
|
|
|
if customer_id:
|
|
qs = qs.filter(account_address__account__customer_id=customer_id)
|
|
if account_id:
|
|
qs = qs.filter(account_address__account_id=account_id)
|
|
if status:
|
|
qs = qs.filter(status=status)
|
|
if date:
|
|
qs = qs.filter(date=datetime.strptime(date, "%Y-%m-%d").date())
|
|
if start_date:
|
|
qs = qs.filter(date__gte=datetime.strptime(start_date, "%Y-%m-%d").date())
|
|
if end_date:
|
|
qs = qs.filter(date__lte=datetime.strptime(end_date, "%Y-%m-%d").date())
|
|
|
|
qs = qs.order_by('-date')[:limit]
|
|
|
|
results = []
|
|
for s in qs:
|
|
addr = s.account_address
|
|
results.append({
|
|
"id": str(s.id),
|
|
"date": str(s.date),
|
|
"status": s.status,
|
|
"customer": addr.account.customer.name,
|
|
"account": addr.account.name,
|
|
"location": addr.name or "Primary",
|
|
"address": f"{addr.street_address}, {addr.city}, {addr.state} {addr.zip_code}",
|
|
"team_members": [
|
|
f"{t.first_name} {t.last_name}".strip()
|
|
for t in s.team_members.all() if t.role != 'ADMIN'
|
|
],
|
|
"notes": s.notes or ""
|
|
})
|
|
return results
|
|
|
|
services = await fetch()
|
|
return json_response(services)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_service(service_id: str) -> str:
|
|
"""
|
|
Get detailed service information including scope and tasks.
|
|
- ADMIN/TEAM_LEADER: Any service
|
|
- TEAM_MEMBER: Only if assigned
|
|
|
|
Args:
|
|
service_id: UUID of the service
|
|
|
|
Returns:
|
|
JSON object with full service details
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
from core.models import Service
|
|
|
|
@database_sync_to_async
|
|
def fetch():
|
|
try:
|
|
s = Service.objects.select_related(
|
|
'account_address__account__customer',
|
|
'account_address__scope'
|
|
).prefetch_related(
|
|
'team_members',
|
|
'account_address__scope__areas__tasks'
|
|
).get(pk=service_id)
|
|
|
|
if profile.role == Role.TEAM_MEMBER.value:
|
|
if not s.team_members.filter(id=profile.id).exists():
|
|
return {"error": "Access denied. You are not assigned to this service."}
|
|
|
|
addr = s.account_address
|
|
scope_data = None
|
|
|
|
if hasattr(addr, 'scope') and addr.scope:
|
|
scope = addr.scope
|
|
scope_data = {
|
|
"id": str(scope.id),
|
|
"name": scope.name,
|
|
"areas": [
|
|
{
|
|
"id": str(area.id),
|
|
"name": area.name,
|
|
"tasks": [
|
|
{
|
|
"id": str(task.id),
|
|
"description": task.checklist_description,
|
|
"frequency": task.frequency
|
|
}
|
|
for task in area.tasks.all()
|
|
]
|
|
}
|
|
for area in scope.areas.all()
|
|
]
|
|
}
|
|
|
|
return {
|
|
"id": str(s.id),
|
|
"date": str(s.date),
|
|
"status": s.status,
|
|
"notes": s.notes,
|
|
"customer": {
|
|
"id": str(addr.account.customer.id),
|
|
"name": addr.account.customer.name
|
|
},
|
|
"account": {"id": str(addr.account.id), "name": addr.account.name},
|
|
"location": {
|
|
"id": str(addr.id),
|
|
"name": addr.name or "Primary",
|
|
"street_address": addr.street_address,
|
|
"city": addr.city,
|
|
"state": addr.state,
|
|
"zip_code": addr.zip_code
|
|
},
|
|
"team_members": [
|
|
{
|
|
"id": str(t.id),
|
|
"name": f"{t.first_name} {t.last_name}".strip(),
|
|
"email": t.email,
|
|
"phone": t.phone
|
|
}
|
|
for t in s.team_members.all() if t.role != 'ADMIN'
|
|
],
|
|
"scope": scope_data
|
|
}
|
|
except Service.DoesNotExist:
|
|
return {"error": f"Service {service_id} not found"}
|
|
|
|
result = await fetch()
|
|
if "error" in result:
|
|
return error_response(result["error"])
|
|
|
|
return json_response(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_service(
|
|
account_address_id: str,
|
|
date: str,
|
|
status: str = "SCHEDULED",
|
|
team_member_ids: Optional[str] = None,
|
|
notes: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Create a new service. Requires ADMIN role.
|
|
|
|
Args:
|
|
account_address_id: UUID of the account address
|
|
date: Service date in YYYY-MM-DD format
|
|
status: Status (default SCHEDULED)
|
|
team_member_ids: Comma-separated UUIDs of team members to assign
|
|
notes: Optional notes
|
|
|
|
Returns:
|
|
JSON object with created service
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
mutation = """
|
|
mutation CreateService($input: ServiceInput!) {
|
|
createService(input: $input) {
|
|
id
|
|
date
|
|
status
|
|
notes
|
|
}
|
|
}
|
|
"""
|
|
|
|
variables = {
|
|
"input": {
|
|
"accountAddressId": account_address_id,
|
|
"date": date,
|
|
"status": status,
|
|
"notes": notes
|
|
}
|
|
}
|
|
|
|
if team_member_ids:
|
|
variables["input"]["teamMemberIds"] = [
|
|
tid.strip() for tid in team_member_ids.split(",")
|
|
]
|
|
|
|
result = await execute_graphql(mutation, variables)
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({"success": True, "service": result["data"]["createService"]})
|
|
|
|
|
|
@mcp.tool()
|
|
async def update_service(
|
|
service_id: str,
|
|
date: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
team_member_ids: Optional[str] = None,
|
|
notes: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Update an existing service. Requires ADMIN role.
|
|
|
|
Args:
|
|
service_id: UUID of the service to update
|
|
date: New date in YYYY-MM-DD format
|
|
status: New status
|
|
team_member_ids: Comma-separated UUIDs of team members (replaces existing)
|
|
notes: Updated notes
|
|
|
|
Returns:
|
|
JSON object with updated service
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
mutation = """
|
|
mutation UpdateService($input: ServiceUpdateInput!) {
|
|
updateService(input: $input) {
|
|
id
|
|
date
|
|
status
|
|
notes
|
|
}
|
|
}
|
|
"""
|
|
|
|
input_data = {"id": service_id}
|
|
if date:
|
|
input_data["date"] = date
|
|
if status:
|
|
input_data["status"] = status
|
|
if notes is not None:
|
|
input_data["notes"] = notes
|
|
if team_member_ids:
|
|
input_data["teamMemberIds"] = [
|
|
tid.strip() for tid in team_member_ids.split(",")
|
|
]
|
|
|
|
result = await execute_graphql(mutation, {"input": input_data})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({"success": True, "service": result["data"]["updateService"]})
|
|
|
|
|
|
@mcp.tool()
|
|
async def delete_service(service_id: str) -> str:
|
|
"""
|
|
Delete a service. Requires ADMIN role.
|
|
|
|
WARNING: This is a destructive action that cannot be undone.
|
|
|
|
Args:
|
|
service_id: UUID of the service to delete
|
|
|
|
Returns:
|
|
JSON object confirming deletion
|
|
"""
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
# First get service details for confirmation message
|
|
from core.models import Service
|
|
|
|
@database_sync_to_async
|
|
def get_service_info():
|
|
try:
|
|
s = Service.objects.select_related(
|
|
'account_address__account'
|
|
).get(pk=service_id)
|
|
return {
|
|
"date": str(s.date),
|
|
"account": s.account_address.account.name if s.account_address else None,
|
|
"status": s.status
|
|
}
|
|
except Service.DoesNotExist:
|
|
return None
|
|
|
|
service_info = await get_service_info()
|
|
if not service_info:
|
|
return error_response(f"Service {service_id} not found")
|
|
|
|
mutation = """
|
|
mutation DeleteService($id: ID!) {
|
|
deleteService(id: $id)
|
|
}
|
|
"""
|
|
|
|
result = await execute_graphql(mutation, {"id": service_id})
|
|
|
|
if "errors" in result:
|
|
return json_response(result)
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"deleted_service": {
|
|
"id": service_id,
|
|
**service_info
|
|
}
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
async def create_services_bulk(
|
|
services_json: str
|
|
) -> str:
|
|
"""
|
|
Create multiple services in a single operation. Requires ADMIN role.
|
|
|
|
This is useful for creating annual calendars or scheduling multiple services at once.
|
|
Each service is validated before any are created (all-or-nothing).
|
|
|
|
Args:
|
|
services_json: JSON array of service objects, each containing:
|
|
- account_address_id: UUID of the account address (required)
|
|
- date: Service date in YYYY-MM-DD format (required)
|
|
- status: Status (optional, default SCHEDULED)
|
|
- team_member_ids: Array of team member UUIDs (optional)
|
|
- notes: Notes (optional)
|
|
|
|
Example:
|
|
[
|
|
{"account_address_id": "uuid1", "date": "2026-01-06"},
|
|
{"account_address_id": "uuid2", "date": "2026-01-10", "notes": "Special instructions"}
|
|
]
|
|
|
|
Returns:
|
|
JSON object with created services count and IDs
|
|
"""
|
|
import json
|
|
from django.db import transaction
|
|
|
|
profile = MCPContext.get_profile()
|
|
if not profile:
|
|
return error_response("No active profile. Call set_active_profile first.")
|
|
|
|
if profile.role != Role.ADMIN.value:
|
|
return error_response("Access denied. ADMIN role required.")
|
|
|
|
# Parse the JSON input
|
|
try:
|
|
services_data = json.loads(services_json)
|
|
except json.JSONDecodeError as e:
|
|
return error_response(f"Invalid JSON: {str(e)}")
|
|
|
|
if not isinstance(services_data, list):
|
|
return error_response("services_json must be a JSON array")
|
|
|
|
if len(services_data) == 0:
|
|
return error_response("services_json array is empty")
|
|
|
|
if len(services_data) > 500:
|
|
return error_response("Maximum 500 services per bulk operation")
|
|
|
|
# Validate all services have required fields
|
|
for i, svc in enumerate(services_data):
|
|
if not isinstance(svc, dict):
|
|
return error_response(f"Service at index {i} must be an object")
|
|
if "account_address_id" not in svc:
|
|
return error_response(f"Service at index {i} missing account_address_id")
|
|
if "date" not in svc:
|
|
return error_response(f"Service at index {i} missing date")
|
|
|
|
from core.models import Service, AccountAddress
|
|
|
|
@database_sync_to_async
|
|
def create_all():
|
|
# Validate all account addresses exist
|
|
address_ids = list(set(svc["account_address_id"] for svc in services_data))
|
|
existing_addresses = set(
|
|
str(a.id) for a in AccountAddress.objects.filter(id__in=address_ids)
|
|
)
|
|
|
|
for addr_id in address_ids:
|
|
if addr_id not in existing_addresses:
|
|
raise ValueError(f"Account address {addr_id} not found")
|
|
|
|
# Check for duplicate date/address combinations
|
|
date_address_combos = [
|
|
(svc["date"], svc["account_address_id"]) for svc in services_data
|
|
]
|
|
|
|
# Check against existing services
|
|
existing = Service.objects.filter(
|
|
account_address_id__in=address_ids
|
|
).values_list('date', 'account_address_id')
|
|
|
|
existing_combos = set((str(d), str(a)) for d, a in existing)
|
|
for date_val, addr_id in date_address_combos:
|
|
if (date_val, addr_id) in existing_combos:
|
|
raise ValueError(
|
|
f"Service already exists for address {addr_id} on {date_val}"
|
|
)
|
|
|
|
# Create all services in a transaction
|
|
with transaction.atomic():
|
|
to_create = []
|
|
for svc in services_data:
|
|
service = Service(
|
|
account_address_id=svc["account_address_id"],
|
|
date=datetime.strptime(svc["date"], "%Y-%m-%d").date(),
|
|
status=svc.get("status", "SCHEDULED"),
|
|
notes=svc.get("notes")
|
|
)
|
|
to_create.append(service)
|
|
|
|
created = Service.objects.bulk_create(to_create)
|
|
|
|
# Handle team member assignments if provided
|
|
for i, service in enumerate(created):
|
|
team_ids = services_data[i].get("team_member_ids", [])
|
|
if team_ids:
|
|
service.team_members.set(team_ids)
|
|
|
|
return [str(s.id) for s in created]
|
|
|
|
try:
|
|
created_ids = await create_all()
|
|
except ValueError as e:
|
|
return error_response(str(e))
|
|
|
|
return json_response({
|
|
"success": True,
|
|
"created_count": len(created_ids),
|
|
"service_ids": created_ids
|
|
})
|