2026-01-26 11:09:40 -05:00

189 lines
7.3 KiB
Python

from typing import cast
import strawberry
from strawberry.types import Info
from channels.db import database_sync_to_async
from core.graphql.pubsub import pubsub
from core.graphql.inputs.account import (
AccountInput, AccountUpdateInput,
AccountAddressInput, AccountAddressUpdateInput,
AccountContactInput, AccountContactUpdateInput,
)
from core.graphql.types.account import (
AccountType,
AccountAddressType,
AccountContactType,
)
from core.models.account import Account, AccountAddress, AccountContact
from core.graphql.utils import create_object, update_object, delete_object
from core.services.events import (
publish_account_created, publish_account_updated, publish_account_deleted,
publish_account_status_changed,
publish_account_address_created, publish_account_address_updated, publish_account_address_deleted,
publish_account_contact_created, publish_account_contact_updated, publish_account_contact_deleted,
)
@strawberry.type
class Mutation:
@strawberry.mutation(description="Create a new account")
async def create_account(self, input: AccountInput, info: Info) -> AccountType:
instance = await create_object(input, Account)
await pubsub.publish("account_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_created(
account_id=str(instance.id),
triggered_by=profile,
metadata={'customer_id': str(instance.customer_id), 'status': instance.status, 'name': instance.name}
)
return cast(AccountType, instance)
@strawberry.mutation(description="Update an existing account")
async def update_account(self, input: AccountUpdateInput, info: Info) -> AccountType:
# Get old status for comparison
old_account = await database_sync_to_async(Account.objects.get)(pk=input.id.node_id)
old_status = old_account.status
instance = await update_object(input, Account)
await pubsub.publish("account_updated", instance.id)
# Publish events for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_updated(
account_id=str(instance.id),
triggered_by=profile,
metadata={'name': instance.name}
)
# Check for status change
if hasattr(input, 'status') and input.status != old_status:
await publish_account_status_changed(
account_id=str(instance.id),
old_status=old_status,
new_status=instance.status,
triggered_by=profile
)
return cast(AccountType, instance)
@strawberry.mutation(description="Delete an existing account")
async def delete_account(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, Account)
if not instance:
raise ValueError(f"Account with ID {id} does not exist")
await pubsub.publish("account_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_deleted(
account_id=str(id),
triggered_by=profile,
metadata={'name': instance.name}
)
return id
@strawberry.mutation(description="Create a new account address")
async def create_account_address(
self, input: AccountAddressInput, info: Info
) -> AccountAddressType:
instance = await create_object(input, AccountAddress)
await pubsub.publish("account_address_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_address_created(
address_id=str(instance.id),
account_id=str(instance.account_id),
triggered_by=profile
)
return cast(AccountAddressType, instance)
@strawberry.mutation(description="Update an existing account address")
async def update_account_address(
self, input: AccountAddressUpdateInput, info: Info
) -> AccountAddressType:
instance = await update_object(input, AccountAddress)
await pubsub.publish("account_address_updated", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_address_updated(
address_id=str(instance.id),
account_id=str(instance.account_id),
triggered_by=profile
)
return cast(AccountAddressType, instance)
@strawberry.mutation(description="Delete an existing account address")
async def delete_account_address(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, AccountAddress)
if not instance:
raise ValueError(f"AccountAddress with ID {id} does not exist")
await pubsub.publish("account_address_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_address_deleted(
address_id=str(id),
account_id=str(instance.account_id),
triggered_by=profile
)
return id
@strawberry.mutation(description="Create a new account contact")
async def create_account_contact(
self, input: AccountContactInput, info: Info
) -> AccountContactType:
instance = await create_object(input, AccountContact)
await pubsub.publish("account_contact_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_contact_created(
contact_id=str(instance.id),
account_id=str(instance.account_id),
triggered_by=profile
)
return cast(AccountContactType, instance)
@strawberry.mutation(description="Update an existing account contact")
async def update_account_contact(
self, input: AccountContactUpdateInput, info: Info
) -> AccountContactType:
instance = await update_object(input, AccountContact)
await pubsub.publish("account_contact_updated", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_contact_updated(
contact_id=str(instance.id),
account_id=str(instance.account_id),
triggered_by=profile
)
return cast(AccountContactType, instance)
@strawberry.mutation(description="Delete an existing account contact")
async def delete_account_contact(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, AccountContact)
if not instance:
raise ValueError(f"AccountContact with ID {id} does not exist")
await pubsub.publish("account_contact_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_account_contact_deleted(
contact_id=str(id),
account_id=str(instance.account_id),
triggered_by=profile
)
return id