212 lines
9.1 KiB
Python
212 lines
9.1 KiB
Python
from typing import AsyncGenerator
|
|
import strawberry
|
|
from channels.db import database_sync_to_async
|
|
from strawberry.types import Info
|
|
from strawberry.relay import GlobalID
|
|
|
|
from core.graphql.pubsub import pubsub
|
|
from core.graphql.types.messaging import ConversationType, MessageType
|
|
from core.graphql.utils import _extract_id
|
|
from core.models.messaging import Conversation, Message
|
|
|
|
|
|
@strawberry.type
|
|
class Subscription:
|
|
@strawberry.subscription(description="Subscribe to new conversations")
|
|
async def conversation_created(self, info: Info) -> AsyncGenerator[ConversationType, None]:
|
|
"""Real-time notification when a new conversation is created"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("conversation_created") as subscriber:
|
|
async for payload in subscriber:
|
|
conversation_id = await _extract_id(payload)
|
|
try:
|
|
instance = await database_sync_to_async(Conversation.objects.get)(pk=conversation_id)
|
|
except Conversation.DoesNotExist:
|
|
continue
|
|
yield instance
|
|
|
|
@strawberry.subscription(description="Subscribe to conversation updates")
|
|
async def conversation_updated(self, info: Info) -> AsyncGenerator[ConversationType, None]:
|
|
"""Real-time notification when a conversation is updated"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("conversation_updated") as subscriber:
|
|
async for payload in subscriber:
|
|
conversation_id = await _extract_id(payload)
|
|
try:
|
|
instance = await database_sync_to_async(Conversation.objects.get)(pk=conversation_id)
|
|
except Conversation.DoesNotExist:
|
|
continue
|
|
yield instance
|
|
|
|
@strawberry.subscription(description="Subscribe to new messages in a specific conversation")
|
|
async def message_sent(
|
|
self,
|
|
info: Info,
|
|
conversation_id: GlobalID
|
|
) -> AsyncGenerator[MessageType, None]:
|
|
"""
|
|
Real-time notification when a new message is sent in a specific conversation.
|
|
Clients should subscribe to this for each conversation they have open.
|
|
"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("message_sent") as subscriber:
|
|
async for payload in subscriber:
|
|
# payload is a dict with message_id and conversation_id
|
|
if isinstance(payload, dict):
|
|
msg_conversation_id = payload.get("conversation_id")
|
|
message_id = payload.get("message_id")
|
|
|
|
# Only yield messages for the requested conversation
|
|
if str(msg_conversation_id) != str(conversation_id):
|
|
continue
|
|
|
|
try:
|
|
instance = await database_sync_to_async(Message.objects.get)(pk=message_id)
|
|
except Message.DoesNotExist:
|
|
continue
|
|
yield instance
|
|
|
|
@strawberry.subscription(description="Subscribe to new messages across all conversations")
|
|
async def message_received(self, info: Info) -> AsyncGenerator[MessageType, None]:
|
|
"""
|
|
Real-time notification for all new messages across all conversations.
|
|
Useful for showing notifications or updating unread counts.
|
|
"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
# Get user's profile to filter relevant messages
|
|
profile = None
|
|
if hasattr(user, 'team_profile'):
|
|
profile = user.team_profile
|
|
elif hasattr(user, 'customer_profile'):
|
|
profile = user.customer_profile
|
|
|
|
if not profile:
|
|
raise PermissionError("User profile not found")
|
|
|
|
async with pubsub.subscribe("message_sent") as subscriber:
|
|
async for payload in subscriber:
|
|
if isinstance(payload, dict):
|
|
message_id = payload.get("message_id")
|
|
|
|
try:
|
|
instance = await database_sync_to_async(Message.objects.select_related('conversation').get)(pk=message_id)
|
|
|
|
# Check if user is a participant in this conversation
|
|
@database_sync_to_async
|
|
def is_participant():
|
|
from django.contrib.contenttypes.models import ContentType
|
|
content_type = ContentType.objects.get_for_model(type(profile))
|
|
return instance.conversation.participants.filter(
|
|
participant_content_type=content_type,
|
|
participant_object_id=profile.id
|
|
).exists()
|
|
|
|
# Only yield if user is a participant (and not the sender)
|
|
if await is_participant():
|
|
# Don't notify sender of their own messages
|
|
if instance.sender_object_id != profile.id:
|
|
yield instance
|
|
|
|
except Message.DoesNotExist:
|
|
continue
|
|
|
|
@strawberry.subscription(description="Subscribe to conversation read events")
|
|
async def conversation_read(
|
|
self,
|
|
info: Info,
|
|
conversation_id: GlobalID
|
|
) -> AsyncGenerator[ConversationType, None]:
|
|
"""
|
|
Real-time notification when someone marks a conversation as read.
|
|
Useful for showing read receipts.
|
|
"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("conversation_read") as subscriber:
|
|
async for payload in subscriber:
|
|
if isinstance(payload, dict):
|
|
conv_id = payload.get("conversation_id")
|
|
|
|
# Only yield for the requested conversation
|
|
if str(conv_id) != str(conversation_id):
|
|
continue
|
|
|
|
try:
|
|
instance = await database_sync_to_async(Conversation.objects.get)(pk=conv_id)
|
|
except Conversation.DoesNotExist:
|
|
continue
|
|
yield instance
|
|
|
|
@strawberry.subscription(description="Subscribe to participant changes")
|
|
async def participant_added(
|
|
self,
|
|
info: Info,
|
|
conversation_id: GlobalID
|
|
) -> AsyncGenerator[ConversationType, None]:
|
|
"""Real-time notification when a participant is added to a conversation"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("participant_added") as subscriber:
|
|
async for payload in subscriber:
|
|
if isinstance(payload, dict):
|
|
conv_id = payload.get("conversation_id")
|
|
|
|
if str(conv_id) != str(conversation_id):
|
|
continue
|
|
|
|
try:
|
|
instance = await database_sync_to_async(Conversation.objects.get)(pk=conv_id)
|
|
except Conversation.DoesNotExist:
|
|
continue
|
|
yield instance
|
|
|
|
@strawberry.subscription(description="Subscribe to conversation deletion events")
|
|
async def conversation_deleted(self, info: Info) -> AsyncGenerator[strawberry.ID, None]:
|
|
"""Real-time notification when a conversation is deleted"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("conversation_deleted") as subscriber:
|
|
async for payload in subscriber:
|
|
conversation_id = await _extract_id(payload)
|
|
yield strawberry.ID(conversation_id)
|
|
|
|
@strawberry.subscription(description="Subscribe to message deletion events")
|
|
async def message_deleted(
|
|
self,
|
|
info: Info,
|
|
conversation_id: GlobalID
|
|
) -> AsyncGenerator[strawberry.ID, None]:
|
|
"""Real-time notification when a message is deleted"""
|
|
user = info.context.user
|
|
if not user or not user.is_authenticated:
|
|
raise PermissionError("Authentication required")
|
|
|
|
async with pubsub.subscribe("message_deleted") as subscriber:
|
|
async for payload in subscriber:
|
|
if isinstance(payload, dict):
|
|
conv_id = payload.get("conversation_id")
|
|
message_id = payload.get("message_id")
|
|
|
|
if str(conv_id) != str(conversation_id):
|
|
continue
|
|
|
|
yield strawberry.ID(message_id)
|