nexus-5/core/graphql/pubsub.py
2026-01-26 11:09:40 -05:00

49 lines
1.5 KiB
Python

from contextlib import asynccontextmanager
from typing import AsyncGenerator, Any, AsyncIterator
from channels.layers import get_channel_layer
class PubSub:
"""
A PubSub implementation that uses the Django Channels layer.
"""
def __init__(self):
self.channel_layer = get_channel_layer()
async def publish(self, channel: str, message: Any):
"""
Publishes a message to the given channel.
"""
await self.channel_layer.group_send(
channel,
{
"type": "channel.message",
"message": message,
},
)
@asynccontextmanager
async def subscribe(self, channel: str) -> AsyncGenerator[AsyncIterator[Any], None]:
"""
Subscribes to a channel and yields an async iterator over messages.
Designed to be used with 'async with'.
"""
channel_name = await self.channel_layer.new_channel()
await self.channel_layer.group_add(channel, channel_name)
async def _subscriber():
while True:
message = await self.channel_layer.receive(channel_name)
if message.get("type") == "channel.message":
yield message["message"]
try:
yield _subscriber()
finally:
# This cleanup code will run automatically when the 'async with' block is exited.
await self.channel_layer.group_discard(channel, channel_name)
# Create a single global instance for the application to use.
pubsub = PubSub()