# Python from typing import AsyncGenerator import strawberry from channels.db import database_sync_to_async from strawberry.types import Info from core.graphql.pubsub import pubsub from core.graphql.types.scope_template import ( ScopeTemplateType, AreaTemplateType, TaskTemplateType, ) from core.graphql.types.scope import ScopeType from core.graphql.utils import _extract_id from core.models.scope_template import ScopeTemplate, AreaTemplate, TaskTemplate from core.models.scope import Scope @strawberry.type class Subscription: # ScopeTemplate subscriptions @strawberry.subscription(description="Subscribe to scope template creation events") async def scope_template_created(self, info: Info) -> AsyncGenerator[ScopeTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("scope_template_created") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(ScopeTemplate.objects.get)(pk=entity_id) except ScopeTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to scope template updates") async def scope_template_updated(self, info: Info) -> AsyncGenerator[ScopeTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("scope_template_updated") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(ScopeTemplate.objects.get)(pk=entity_id) except ScopeTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to scope template deletion events") async def scope_template_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("scope_template_deleted") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) yield strawberry.ID(entity_id) # AreaTemplate subscriptions @strawberry.subscription(description="Subscribe to area template creation events") async def area_template_created(self, info: Info) -> AsyncGenerator[AreaTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("area_template_created") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(AreaTemplate.objects.get)(pk=entity_id) except AreaTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to area template updates") async def area_template_updated(self, info: Info) -> AsyncGenerator[AreaTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("area_template_updated") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(AreaTemplate.objects.get)(pk=entity_id) except AreaTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to area template deletion events") async def area_template_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("area_template_deleted") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) yield strawberry.ID(entity_id) # TaskTemplate subscriptions @strawberry.subscription(description="Subscribe to task template creation events") async def task_template_created(self, info: Info) -> AsyncGenerator[TaskTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("task_template_created") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(TaskTemplate.objects.get)(pk=entity_id) except TaskTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to task template updates") async def task_template_updated(self, info: Info) -> AsyncGenerator[TaskTemplateType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("task_template_updated") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(TaskTemplate.objects.get)(pk=entity_id) except TaskTemplate.DoesNotExist: continue yield instance @strawberry.subscription(description="Subscribe to task template deletion events") async def task_template_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("task_template_deleted") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) yield strawberry.ID(entity_id) # Scope created from the template @strawberry.subscription(description="Subscribe to scopes created from a template") async def scope_created_from_template(self, info: Info) -> AsyncGenerator[ScopeType, None]: user = info.context.user if not user or not user.is_authenticated: raise PermissionError("Authentication required") async with pubsub.subscribe("scope_created_from_template") as subscriber: async for payload in subscriber: entity_id = await _extract_id(payload) try: instance = await database_sync_to_async(Scope.objects.get)(pk=entity_id) except Scope.DoesNotExist: continue yield instance