2026-01-26 11:09:40 -05:00

96 lines
4.4 KiB
Python

from typing import AsyncGenerator
import strawberry
from channels.db import database_sync_to_async
from strawberry.types import Info
from core.graphql.pubsub import pubsub
from core.graphql.types.profile import CustomerProfileType, TeamProfileType
from core.graphql.utils import _extract_id
from core.models.profile import CustomerProfile, TeamProfile
@strawberry.type
class Subscription:
# CustomerProfile subscriptions
@strawberry.subscription(description="Subscribe to customer profile creation events")
async def customer_profile_created(self, info: Info) -> AsyncGenerator[CustomerProfileType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("customer_profile_created") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(CustomerProfile.objects.get)(pk=entity_id)
except CustomerProfile.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to customer profile updates")
async def customer_profile_updated(self, info: Info) -> AsyncGenerator[CustomerProfileType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("customer_profile_updated") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(CustomerProfile.objects.get)(pk=entity_id)
except CustomerProfile.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to customer profile deletion events")
async def customer_profile_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("customer_profile_deleted") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
yield strawberry.ID(entity_id)
# TeamProfile subscriptions
@strawberry.subscription(description="Subscribe to team profile creation events")
async def team_profile_created(self, info: Info) -> AsyncGenerator[TeamProfileType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("team_profile_created") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(TeamProfile.objects.get)(pk=entity_id)
except TeamProfile.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to team profile updates")
async def team_profile_updated(self, info: Info) -> AsyncGenerator[TeamProfileType, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("team_profile_updated") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
try:
instance = await database_sync_to_async(TeamProfile.objects.get)(pk=entity_id)
except TeamProfile.DoesNotExist:
continue
yield instance
@strawberry.subscription(description="Subscribe to team profile deletion events")
async def team_profile_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
user = info.context.user
if not user or not user.is_authenticated:
raise PermissionError("Authentication required")
async with pubsub.subscribe("team_profile_deleted") as subscriber:
async for payload in subscriber:
entity_id = await _extract_id(payload)
yield strawberry.ID(entity_id)