from typing import List, cast from uuid import UUID import strawberry from channels.db import database_sync_to_async from django.core.exceptions import ValidationError from strawberry import Info from core.graphql.inputs.session import OpenServiceSessionInput, CloseServiceSessionInput, RevertServiceSessionInput, ProjectSessionStartInput, \ ProjectSessionCloseInput, ProjectSessionRevertInput from core.graphql.pubsub import pubsub from core.graphql.types.session import ServiceSessionType, ProjectSessionType from core.models.profile import TeamProfile from core.models.scope import Task from core.models.session import ServiceSession, ProjectSession from core.models.project_scope import ProjectScopeTask from core.services.session_service import SessionService from core.services.events import ( publish_service_session_opened, publish_service_session_closed, publish_service_session_reverted, publish_service_task_completed, publish_service_task_uncompleted, publish_project_session_opened, publish_project_session_closed, publish_project_session_reverted, publish_project_task_completed, publish_project_task_uncompleted, ) @strawberry.type class Mutation: @strawberry.mutation(description="Revert an active service session back to scheduled (deletes the active session)") async def revert_service_session(self, input: RevertServiceSessionInput, info: Info) -> bool: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can revert service sessions.") service_pk = UUID(str(input.service_id)) svc = SessionService() result = await database_sync_to_async(svc.revert_session)( entity_type="service", entity_id=service_pk, actor=profile, ) # Publish event await publish_service_session_reverted( session_id=str(result.session_id), service_id=str(result.entity_id), triggered_by=profile ) return True @strawberry.mutation(description="Open a service session for a scheduled service") async def open_service_session(self, input: OpenServiceSessionInput, info: Info) -> ServiceSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can open service sessions.") service_pk = UUID(str(input.service_id)) svc = SessionService() result = await database_sync_to_async(svc.open_session)( entity_type="service", entity_id=service_pk, actor=profile, ) async def load_session() -> ServiceSession: return await database_sync_to_async( lambda: ( ServiceSession.objects .select_related("service", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=result.session_id) ) )() session = await load_session() # Publish event await publish_service_session_opened( session_id=str(result.session_id), service_id=str(result.entity_id), triggered_by=profile ) return cast(ServiceSessionType, cast(object, session)) @strawberry.mutation(description="Close the active service session and record completed tasks") async def close_service_session(self, input: CloseServiceSessionInput, info: Info) -> ServiceSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can close service sessions.") service_pk = UUID(str(input.service_id)) task_pks: List[UUID] = [UUID(str(x)) for x in input.task_ids] def load_tasks() -> List[Task]: qs = Task.objects.filter(pk__in=task_pks) return list(qs) tasks = await database_sync_to_async(load_tasks)() if len(tasks) != len(task_pks): raise ValidationError("One or more task IDs are invalid.") svc = SessionService() result = await database_sync_to_async(svc.close_session)( entity_type="service", entity_id=service_pk, actor=profile, tasks=tasks, ) async def load_session() -> ServiceSession: return await database_sync_to_async( lambda: ( ServiceSession.objects .select_related("service", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=result.session_id) ) )() session = await load_session() # Get account name and service date for notifications account_name = session.account.name if session.account else None service_date = str(session.service.date) if session.service else None # Publish event await publish_service_session_closed( session_id=str(result.session_id), service_id=str(result.entity_id), triggered_by=profile, metadata={ 'account_name': account_name, 'date': service_date } ) return cast(ServiceSessionType, cast(object, session)) @strawberry.mutation(description="Add a task completion to an active service session") async def add_task_completion(self, info: Info, service_id: strawberry.ID, task_id: strawberry.ID, notes: str | None = None) -> ServiceSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can add task completions.") svc = SessionService() task_pk = UUID(str(task_id)) service_pk = UUID(str(service_id)) # Load task to get name for event task = await database_sync_to_async(Task.objects.get)(pk=task_pk) session_id = await database_sync_to_async(svc.add_task_completion)( service_id=service_pk, task_id=task_pk, actor=profile, notes=notes, ) async def load_session() -> ServiceSession: return await database_sync_to_async( lambda: ( ServiceSession.objects .select_related("service", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=session_id) ) )() session = await load_session() # Publish event await publish_service_task_completed( task_id=str(task_pk), service_id=str(service_pk), task_name=task.checklist_description, triggered_by=profile ) return cast(ServiceSessionType, cast(object, session)) @strawberry.mutation(description="Remove a task completion from an active service session") async def remove_task_completion(self, info: Info, service_id: strawberry.ID, task_id: strawberry.ID) -> ServiceSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can remove task completions.") svc = SessionService() task_pk = UUID(str(task_id)) service_pk = UUID(str(service_id)) # Load task to get name for event task = await database_sync_to_async(Task.objects.get)(pk=task_pk) session_id = await database_sync_to_async(svc.remove_task_completion)( service_id=service_pk, task_id=task_pk, ) async def load_session() -> ServiceSession: return await database_sync_to_async( lambda: ( ServiceSession.objects .select_related("service", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=session_id) ) )() session = await load_session() # Publish event await publish_service_task_uncompleted( task_id=str(task_pk), service_id=str(service_pk), task_name=task.checklist_description, triggered_by=profile ) return cast(ServiceSessionType, cast(object, session)) @strawberry.mutation(description="Add a task completion to an active project session") async def add_project_task_completion(self, info: Info, project_id: strawberry.ID, task_id: strawberry.ID, notes: str | None = None) -> ProjectSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can add project task completions.") svc = SessionService() # Load task to get name and validate it exists task_pk = UUID(str(task_id)) project_pk = UUID(str(project_id)) try: task = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=task_pk) except ProjectScopeTask.DoesNotExist: raise ValidationError("Invalid project task ID.") session_id = await database_sync_to_async(svc.add_project_task_completion)( project_id=project_pk, task_id=task_pk, actor=profile, notes=notes, ) async def load_session() -> ProjectSession: return await database_sync_to_async( lambda: ( ProjectSession.objects .select_related("project", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=session_id) ) )() session = await load_session() # Publish event await publish_project_task_completed( task_id=str(task_pk), project_id=str(project_pk), task_name=task.checklist_description, triggered_by=profile ) return cast(ProjectSessionType, cast(object, session)) @strawberry.mutation(description="Remove a task completion from an active project session") async def remove_project_task_completion(self, info: Info, project_id: strawberry.ID, task_id: strawberry.ID) -> ProjectSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can remove project task completions.") svc = SessionService() task_pk = UUID(str(task_id)) project_pk = UUID(str(project_id)) # Load task to get name for event task = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=task_pk) session_id = await database_sync_to_async(svc.remove_project_task_completion)( project_id=project_pk, task_id=task_pk, ) async def load_session() -> ProjectSession: return await database_sync_to_async( lambda: ( ProjectSession.objects .select_related("project", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=session_id) ) )() session = await load_session() # Publish event await publish_project_task_uncompleted( task_id=str(task_pk), project_id=str(project_pk), task_name=task.checklist_description, triggered_by=profile ) return cast(ProjectSessionType, cast(object, session)) @strawberry.mutation(description="Start a new ProjectSession for a scheduled project") async def open_project_session(self, input: ProjectSessionStartInput, info: Info) -> ProjectSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can start project sessions.") project_pk = UUID(str(input.project_id)) svc = SessionService() result = await database_sync_to_async(svc.open_session)( entity_type="project", entity_id=project_pk, actor=profile, ) async def load_session() -> ProjectSession: return await database_sync_to_async( lambda: ( ProjectSession.objects .select_related("project", "account", "account_address", "customer", "scope", "created_by", "closed_by") .prefetch_related("completed_tasks") .get(pk=result.session_id) ) )() session = await load_session() # Notify listeners that the project was updated (status change, etc.) await pubsub.publish("project_updated", result.entity_id) await pubsub.publish("project_session_created", result.session_id) # Publish event await publish_project_session_opened( session_id=str(result.session_id), project_id=str(result.entity_id), triggered_by=profile ) return cast(ProjectSessionType, cast(object, session)) @strawberry.mutation(description="Close the active ProjectSession") async def close_project_session(self, input: ProjectSessionCloseInput, info: Info) -> ProjectSessionType: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can close project sessions.") project_pk = UUID(str(input.project_id)) task_ids_raw = input.completed_task_ids or [] task_pks: List[UUID] = [UUID(str(x)) for x in task_ids_raw] # Load ProjectScopeTask objects for the provided IDs def load_tasks() -> List[ProjectScopeTask]: qs = ProjectScopeTask.objects.filter(pk__in=task_pks) return list(qs) tasks: List[ProjectScopeTask] = [] if task_pks: tasks = await database_sync_to_async(load_tasks)() if len(tasks) != len(task_pks): raise ValidationError("One or more project task IDs are invalid.") # Let the service manage select_for_update inside its @transaction.atomic svc = SessionService() result = await database_sync_to_async(svc.close_session)( entity_type="project", entity_id=project_pk, actor=profile, tasks=tasks if task_pks else None, ) async def load_session() -> ProjectSession: return await database_sync_to_async( lambda: ( ProjectSession.objects .select_related( "project", "account", "account_address", "customer", "scope", "created_by", "closed_by" ) .prefetch_related("completed_tasks") .get(pk=result.session_id) ) )() session = await load_session() await pubsub.publish("project_updated", result.entity_id) await pubsub.publish("project_session_closed", result.session_id) # Get account/customer name and project date for notifications if session.account: account_name = session.account.name elif session.customer: account_name = session.customer.name else: account_name = None project_date = str(session.project.date) if session.project and session.project.date else None # Publish event await publish_project_session_closed( session_id=str(result.session_id), project_id=str(result.entity_id), triggered_by=profile, metadata={ 'account_name': account_name, 'date': project_date } ) return cast(ProjectSessionType, cast(object, session)) @strawberry.mutation(description="Revert the active ProjectSession back to scheduled (deletes the active session)") async def revert_project_session(self, input: ProjectSessionRevertInput, info: Info) -> bool: # Use Oathkeeper authentication profile = getattr(info.context.request, "profile", None) if not profile or not isinstance(profile, TeamProfile): raise ValidationError("Authentication required. Only team members can revert project sessions.") project_pk = UUID(str(input.project_id)) svc = SessionService() result = await database_sync_to_async(svc.revert_session)( entity_type="project", entity_id=project_pk, actor=profile, ) # Publish project updated to reflect status change await pubsub.publish("project_updated", result.entity_id) # Publish event await publish_project_session_reverted( session_id=str(result.session_id), project_id=str(result.entity_id), triggered_by=profile ) return True