2026-01-26 11:09:40 -05:00

142 lines
6.6 KiB
Python

from typing import AsyncGenerator
import strawberry
from channels.db import database_sync_to_async
from strawberry.types import Info
from core.graphql.pubsub import pubsub
from core.graphql.types.project_scope import (
ProjectScopeType,
ProjectScopeCategoryType,
ProjectScopeTaskType,
)
from core.graphql.utils import _extract_id
from core.models.project_scope import ProjectScope, ProjectScopeCategory, ProjectScopeTask
@strawberry.type
class Subscription:
# ProjectScope events
@strawberry.subscription(description="Subscribe to project scope creation events")
async def project_scope_created(self, info: Info) -> AsyncGenerator[ProjectScopeType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_created") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScope.objects.get)(pk=entity_id)
except ProjectScope.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope updates")
async def project_scope_updated(self, info: Info) -> AsyncGenerator[ProjectScopeType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_updated") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScope.objects.get)(pk=entity_id)
except ProjectScope.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope deletion events")
async def project_scope_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_deleted") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
yield strawberry.ID(entity_id)
# ProjectScopeCategory (areas)
@strawberry.subscription(description="Subscribe to project scope category creation events")
async def project_scope_category_created(self, info: Info) -> AsyncGenerator[ProjectScopeCategoryType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_category_created") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScopeCategory.objects.get)(pk=entity_id)
except ProjectScopeCategory.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope category updates")
async def project_scope_category_updated(self, info: Info) -> AsyncGenerator[ProjectScopeCategoryType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_category_updated") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScopeCategory.objects.get)(pk=entity_id)
except ProjectScopeCategory.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope category deletion events")
async def project_scope_category_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_category_deleted") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
yield strawberry.ID(entity_id)
# ProjectScopeTask
@strawberry.subscription(description="Subscribe to project scope task creation events")
async def project_scope_task_created(self, info: Info) -> AsyncGenerator[ProjectScopeTaskType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_task_created") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=entity_id)
except ProjectScopeTask.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope task updates")
async def project_scope_task_updated(self, info: Info) -> AsyncGenerator[ProjectScopeTaskType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_task_updated") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(ProjectScopeTask.objects.get)(pk=entity_id)
except ProjectScopeTask.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to project scope task deletion events")
async def project_scope_task_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("project_scope_task_deleted") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
yield strawberry.ID(entity_id)