2026-01-26 11:09:40 -05:00

189 lines
7.3 KiB
Python

from typing import cast
import strawberry
from strawberry.types import Info
from channels.db import database_sync_to_async
from core.graphql.pubsub import pubsub
from core.graphql.inputs.customer import (
CustomerInput, CustomerUpdateInput,
CustomerAddressInput, CustomerAddressUpdateInput,
CustomerContactInput, CustomerContactUpdateInput,
)
from core.graphql.types.customer import (
CustomerType,
CustomerAddressType,
CustomerContactType,
)
from core.models.customer import Customer, CustomerAddress, CustomerContact
from core.graphql.utils import create_object, update_object, delete_object
from core.services.events import (
publish_customer_created, publish_customer_updated, publish_customer_deleted,
publish_customer_status_changed,
publish_customer_address_created, publish_customer_address_updated, publish_customer_address_deleted,
publish_customer_contact_created, publish_customer_contact_updated, publish_customer_contact_deleted,
)
@strawberry.type
class Mutation:
@strawberry.mutation(description="Create a new customer")
async def create_customer(self, input: CustomerInput, info: Info) -> CustomerType:
instance = await create_object(input, Customer)
await pubsub.publish(f"customer_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_created(
customer_id=str(instance.id),
triggered_by=profile,
metadata={'status': instance.status, 'name': instance.name}
)
return cast(CustomerType, instance)
@strawberry.mutation(description="Update an existing customer")
async def update_customer(self, input: CustomerUpdateInput, info: Info) -> CustomerType:
# Get old status for comparison
old_customer = await database_sync_to_async(Customer.objects.get)(pk=input.id.node_id)
old_status = old_customer.status
instance = await update_object(input, Customer)
await pubsub.publish(f"customer_updated", instance.id)
# Publish events for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_updated(
customer_id=str(instance.id),
triggered_by=profile,
metadata={'name': instance.name}
)
# Check for status change
if hasattr(input, 'status') and input.status != old_status:
await publish_customer_status_changed(
customer_id=str(instance.id),
old_status=old_status,
new_status=instance.status,
triggered_by=profile
)
return cast(CustomerType, instance)
@strawberry.mutation(description="Delete an existing customer")
async def delete_customer(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, Customer)
if not instance:
raise ValueError(f"Customer with ID {id} does not exist")
await pubsub.publish(f"customer_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_deleted(
customer_id=str(id),
triggered_by=profile,
metadata={'name': instance.name}
)
return id
@strawberry.mutation(description="Create a new customer address")
async def create_customer_address(
self, input: CustomerAddressInput, info: Info
) -> CustomerAddressType:
instance = await create_object(input, CustomerAddress)
await pubsub.publish(f"customer_address_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_address_created(
address_id=str(instance.id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return cast(CustomerAddressType, instance)
@strawberry.mutation(description="Update an existing customer address")
async def update_customer_address(
self, input: CustomerAddressUpdateInput, info: Info
) -> CustomerAddressType:
instance = await update_object(input, CustomerAddress)
await pubsub.publish(f"customer_address_updated", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_address_updated(
address_id=str(instance.id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return cast(CustomerAddressType, instance)
@strawberry.mutation(description="Delete an existing customer address")
async def delete_customer_address(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, CustomerAddress)
if not instance:
raise ValueError(f"CustomerAddress with ID {id} does not exist")
await pubsub.publish(f"customer_address_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_address_deleted(
address_id=str(id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return id
@strawberry.mutation(description="Create a new customer contact")
async def create_customer_contact(
self, input: CustomerContactInput, info: Info
) -> CustomerContactType:
instance = await create_object(input, CustomerContact)
await pubsub.publish(f"customer_contact_created", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_contact_created(
contact_id=str(instance.id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return cast(CustomerContactType, instance)
@strawberry.mutation(description="Update an existing customer contact")
async def update_customer_contact(
self, input: CustomerContactUpdateInput, info: Info
) -> CustomerContactType:
instance = await update_object(input, CustomerContact)
await pubsub.publish(f"customer_contact_updated", instance.id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_contact_updated(
contact_id=str(instance.id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return cast(CustomerContactType, instance)
@strawberry.mutation(description="Delete an existing customer contact")
async def delete_customer_contact(self, id: strawberry.ID, info: Info) -> strawberry.ID:
instance = await delete_object(id, CustomerContact)
if not instance:
raise ValueError(f"CustomerContact with ID {id} does not exist")
await pubsub.publish(f"customer_contact_deleted", id)
# Publish event for notifications
profile = getattr(info.context.request, 'profile', None)
await publish_customer_contact_deleted(
contact_id=str(id),
customer_id=str(instance.customer_id),
triggered_by=profile
)
return id