2026-01-26 11:09:40 -05:00

468 lines
18 KiB
Python

from typing import List, cast
from uuid import UUID
import strawberry
from channels.db import database_sync_to_async
from django.core.exceptions import ValidationError
from strawberry import Info
from core.graphql.inputs.session import OpenServiceSessionInput, CloseServiceSessionInput, RevertServiceSessionInput, ProjectSessionStartInput, \
ProjectSessionCloseInput, ProjectSessionRevertInput
from core.graphql.pubsub import pubsub
from core.graphql.types.session import ServiceSessionType, ProjectSessionType
from core.models.profile import TeamProfile
from core.models.scope import Task
from core.models.session import ServiceSession, ProjectSession
from core.models.project_scope import ProjectScopeTask
from core.services.session_service import SessionService
from core.services.events import (
publish_service_session_opened, publish_service_session_closed, publish_service_session_reverted,
publish_service_task_completed, publish_service_task_uncompleted,
publish_project_session_opened, publish_project_session_closed, publish_project_session_reverted,
publish_project_task_completed, publish_project_task_uncompleted,
)
@strawberry.type
class Mutation:
@strawberry.mutation(description="Revert an active service session back to scheduled (deletes the active session)")
async def revert_service_session(self, input: RevertServiceSessionInput, info: Info) -> bool:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can revert service sessions.")
service_pk = UUID(str(input.service_id))
svc = SessionService()
result = await database_sync_to_async(svc.revert_session)(
entity_type="service",
entity_id=service_pk,
actor=profile,
)
# Publish event
await publish_service_session_reverted(
session_id=str(result.session_id),
service_id=str(result.entity_id),
triggered_by=profile
)
return True
@strawberry.mutation(description="Open a service session for a scheduled service")
async def open_service_session(self, input: OpenServiceSessionInput, info: Info) -> ServiceSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can open service sessions.")
service_pk = UUID(str(input.service_id))
svc = SessionService()
result = await database_sync_to_async(svc.open_session)(
entity_type="service",
entity_id=service_pk,
actor=profile,
)
async def load_session() -> ServiceSession:
return await database_sync_to_async(
lambda: (
ServiceSession.objects
.select_related("service", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=result.session_id)
)
)()
session = await load_session()
# Publish event
await publish_service_session_opened(
session_id=str(result.session_id),
service_id=str(result.entity_id),
triggered_by=profile
)
return cast(ServiceSessionType, cast(object, session))
@strawberry.mutation(description="Close the active service session and record completed tasks")
async def close_service_session(self, input: CloseServiceSessionInput, info: Info) -> ServiceSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can close service sessions.")
service_pk = UUID(str(input.service_id))
task_pks: List[UUID] = [UUID(str(x)) for x in input.task_ids]
def load_tasks() -> List[Task]:
qs = Task.objects.filter(pk__in=task_pks)
return list(qs)
tasks = await database_sync_to_async(load_tasks)()
if len(tasks) != len(task_pks):
raise ValidationError("One or more task IDs are invalid.")
svc = SessionService()
result = await database_sync_to_async(svc.close_session)(
entity_type="service",
entity_id=service_pk,
actor=profile,
tasks=tasks,
)
async def load_session() -> ServiceSession:
return await database_sync_to_async(
lambda: (
ServiceSession.objects
.select_related("service", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=result.session_id)
)
)()
session = await load_session()
# Get account name and service date for notifications
account_name = session.account.name if session.account else None
service_date = str(session.service.date) if session.service else None
# Publish event
await publish_service_session_closed(
session_id=str(result.session_id),
service_id=str(result.entity_id),
triggered_by=profile,
metadata={
'account_name': account_name,
'date': service_date
}
)
return cast(ServiceSessionType, cast(object, session))
@strawberry.mutation(description="Add a task completion to an active service session")
async def add_task_completion(self, info: Info, service_id: strawberry.ID, task_id: strawberry.ID,
notes: str | None = None) -> ServiceSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can add task completions.")
svc = SessionService()
task_pk = UUID(str(task_id))
service_pk = UUID(str(service_id))
# Load task to get name for event
task = await database_sync_to_async(Task.objects.get)(pk=task_pk)
session_id = await database_sync_to_async(svc.add_task_completion)(
service_id=service_pk,
task_id=task_pk,
actor=profile,
notes=notes,
)
async def load_session() -> ServiceSession:
return await database_sync_to_async(
lambda: (
ServiceSession.objects
.select_related("service", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=session_id)
)
)()
session = await load_session()
# Publish event
await publish_service_task_completed(
task_id=str(task_pk),
service_id=str(service_pk),
task_name=task.checklist_description,
triggered_by=profile
)
return cast(ServiceSessionType, cast(object, session))
@strawberry.mutation(description="Remove a task completion from an active service session")
async def remove_task_completion(self, info: Info, service_id: strawberry.ID,
task_id: strawberry.ID) -> ServiceSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can remove task completions.")
svc = SessionService()
task_pk = UUID(str(task_id))
service_pk = UUID(str(service_id))
# Load task to get name for event
task = await database_sync_to_async(Task.objects.get)(pk=task_pk)
session_id = await database_sync_to_async(svc.remove_task_completion)(
service_id=service_pk,
task_id=task_pk,
)
async def load_session() -> ServiceSession:
return await database_sync_to_async(
lambda: (
ServiceSession.objects
.select_related("service", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=session_id)
)
)()
session = await load_session()
# Publish event
await publish_service_task_uncompleted(
task_id=str(task_pk),
service_id=str(service_pk),
task_name=task.checklist_description,
triggered_by=profile
)
return cast(ServiceSessionType, cast(object, session))
@strawberry.mutation(description="Add a task completion to an active project session")
async def add_project_task_completion(self, info: Info, project_id: strawberry.ID, task_id: strawberry.ID,
notes: str | None = None) -> ProjectSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can add project task completions.")
svc = SessionService()
# Load task to get name and validate it exists
task_pk = UUID(str(task_id))
project_pk = UUID(str(project_id))
try:
task = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=task_pk)
except ProjectScopeTask.DoesNotExist:
raise ValidationError("Invalid project task ID.")
session_id = await database_sync_to_async(svc.add_project_task_completion)(
project_id=project_pk,
task_id=task_pk,
actor=profile,
notes=notes,
)
async def load_session() -> ProjectSession:
return await database_sync_to_async(
lambda: (
ProjectSession.objects
.select_related("project", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=session_id)
)
)()
session = await load_session()
# Publish event
await publish_project_task_completed(
task_id=str(task_pk),
project_id=str(project_pk),
task_name=task.checklist_description,
triggered_by=profile
)
return cast(ProjectSessionType, cast(object, session))
@strawberry.mutation(description="Remove a task completion from an active project session")
async def remove_project_task_completion(self, info: Info, project_id: strawberry.ID,
task_id: strawberry.ID) -> ProjectSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can remove project task completions.")
svc = SessionService()
task_pk = UUID(str(task_id))
project_pk = UUID(str(project_id))
# Load task to get name for event
task = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=task_pk)
session_id = await database_sync_to_async(svc.remove_project_task_completion)(
project_id=project_pk,
task_id=task_pk,
)
async def load_session() -> ProjectSession:
return await database_sync_to_async(
lambda: (
ProjectSession.objects
.select_related("project", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=session_id)
)
)()
session = await load_session()
# Publish event
await publish_project_task_uncompleted(
task_id=str(task_pk),
project_id=str(project_pk),
task_name=task.checklist_description,
triggered_by=profile
)
return cast(ProjectSessionType, cast(object, session))
@strawberry.mutation(description="Start a new ProjectSession for a scheduled project")
async def open_project_session(self, input: ProjectSessionStartInput, info: Info) -> ProjectSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can start project sessions.")
project_pk = UUID(str(input.project_id))
svc = SessionService()
result = await database_sync_to_async(svc.open_session)(
entity_type="project",
entity_id=project_pk,
actor=profile,
)
async def load_session() -> ProjectSession:
return await database_sync_to_async(
lambda: (
ProjectSession.objects
.select_related("project", "account", "account_address", "customer", "scope", "created_by",
"closed_by")
.prefetch_related("completed_tasks")
.get(pk=result.session_id)
)
)()
session = await load_session()
# Notify listeners that the project was updated (status change, etc.)
await pubsub.publish("project_updated", result.entity_id)
await pubsub.publish("project_session_created", result.session_id)
# Publish event
await publish_project_session_opened(
session_id=str(result.session_id),
project_id=str(result.entity_id),
triggered_by=profile
)
return cast(ProjectSessionType, cast(object, session))
@strawberry.mutation(description="Close the active ProjectSession")
async def close_project_session(self, input: ProjectSessionCloseInput, info: Info) -> ProjectSessionType:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can close project sessions.")
project_pk = UUID(str(input.project_id))
task_ids_raw = input.completed_task_ids or []
task_pks: List[UUID] = [UUID(str(x)) for x in task_ids_raw]
# Load ProjectScopeTask objects for the provided IDs
def load_tasks() -> List[ProjectScopeTask]:
qs = ProjectScopeTask.objects.filter(pk__in=task_pks)
return list(qs)
tasks: List[ProjectScopeTask] = []
if task_pks:
tasks = await database_sync_to_async(load_tasks)()
if len(tasks) != len(task_pks):
raise ValidationError("One or more project task IDs are invalid.")
# Let the service manage select_for_update inside its @transaction.atomic
svc = SessionService()
result = await database_sync_to_async(svc.close_session)(
entity_type="project",
entity_id=project_pk,
actor=profile,
tasks=tasks if task_pks else None,
)
async def load_session() -> ProjectSession:
return await database_sync_to_async(
lambda: (
ProjectSession.objects
.select_related(
"project", "account", "account_address", "customer", "scope", "created_by", "closed_by"
)
.prefetch_related("completed_tasks")
.get(pk=result.session_id)
)
)()
session = await load_session()
await pubsub.publish("project_updated", result.entity_id)
await pubsub.publish("project_session_closed", result.session_id)
# Get account/customer name and project date for notifications
if session.account:
account_name = session.account.name
elif session.customer:
account_name = session.customer.name
else:
account_name = None
project_date = str(session.project.date) if session.project and session.project.date else None
# Publish event
await publish_project_session_closed(
session_id=str(result.session_id),
project_id=str(result.entity_id),
triggered_by=profile,
metadata={
'account_name': account_name,
'date': project_date
}
)
return cast(ProjectSessionType, cast(object, session))
@strawberry.mutation(description="Revert the active ProjectSession back to scheduled (deletes the active session)")
async def revert_project_session(self, input: ProjectSessionRevertInput, info: Info) -> bool:
# Use Oathkeeper authentication
profile = getattr(info.context.request, "profile", None)
if not profile or not isinstance(profile, TeamProfile):
raise ValidationError("Authentication required. Only team members can revert project sessions.")
project_pk = UUID(str(input.project_id))
svc = SessionService()
result = await database_sync_to_async(svc.revert_session)(
entity_type="project",
entity_id=project_pk,
actor=profile,
)
# Publish project updated to reflect status change
await pubsub.publish("project_updated", result.entity_id)
# Publish event
await publish_project_session_reverted(
session_id=str(result.session_id),
project_id=str(result.entity_id),
triggered_by=profile
)
return True