93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
"""
|
|
Customer domain metadata enrichment.
|
|
Handles Customer, CustomerAddress, and CustomerContact entities.
|
|
"""
|
|
from typing import Dict, Any, Set
|
|
|
|
from asgiref.sync import sync_to_async
|
|
|
|
from core.models.enums import EventTypeChoices
|
|
|
|
|
|
CUSTOMER_EVENTS: Set[str] = {
|
|
EventTypeChoices.CUSTOMER_CREATED,
|
|
EventTypeChoices.CUSTOMER_UPDATED,
|
|
EventTypeChoices.CUSTOMER_DELETED,
|
|
EventTypeChoices.CUSTOMER_STATUS_CHANGED,
|
|
EventTypeChoices.CUSTOMER_ADDRESS_CREATED,
|
|
EventTypeChoices.CUSTOMER_ADDRESS_UPDATED,
|
|
EventTypeChoices.CUSTOMER_ADDRESS_DELETED,
|
|
EventTypeChoices.CUSTOMER_CONTACT_CREATED,
|
|
EventTypeChoices.CUSTOMER_CONTACT_UPDATED,
|
|
EventTypeChoices.CUSTOMER_CONTACT_DELETED,
|
|
}
|
|
|
|
|
|
async def enrich_customer_domain(entity_type: str, entity_id: str) -> Dict[str, Any]:
|
|
"""Enrich metadata for customer-domain events."""
|
|
if entity_type == 'Customer':
|
|
return await _load_customer_metadata(entity_id)
|
|
elif entity_type == 'CustomerAddress':
|
|
return await _load_customer_address_metadata(entity_id)
|
|
elif entity_type == 'CustomerContact':
|
|
return await _load_customer_contact_metadata(entity_id)
|
|
return {}
|
|
|
|
|
|
async def _load_customer_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from a Customer entity."""
|
|
from core.models.customer import Customer
|
|
|
|
def _load():
|
|
try:
|
|
customer = Customer.objects.get(pk=entity_id)
|
|
return {'customer_name': customer.name or ''}
|
|
except Customer.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|
|
|
|
|
|
async def _load_customer_address_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from a CustomerAddress entity."""
|
|
from core.models.customer import CustomerAddress
|
|
|
|
def _load():
|
|
try:
|
|
address = CustomerAddress.objects.select_related('customer').get(pk=entity_id)
|
|
metadata = {'customer_id': str(address.customer_id)}
|
|
if address.customer:
|
|
metadata['customer_name'] = address.customer.name or ''
|
|
|
|
# Address
|
|
address_parts = []
|
|
if address.street_address:
|
|
address_parts.append(address.street_address)
|
|
if address.city:
|
|
address_parts.append(address.city)
|
|
if address_parts:
|
|
metadata['address'] = ', '.join(address_parts)
|
|
|
|
return metadata
|
|
except CustomerAddress.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|
|
|
|
|
|
async def _load_customer_contact_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from a CustomerContact entity."""
|
|
from core.models.customer import CustomerContact
|
|
|
|
def _load():
|
|
try:
|
|
contact = CustomerContact.objects.select_related('customer').get(pk=entity_id)
|
|
metadata = {'customer_id': str(contact.customer_id)}
|
|
if contact.customer:
|
|
metadata['customer_name'] = contact.customer.name or ''
|
|
return metadata
|
|
except CustomerContact.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|