136 lines
4.6 KiB
Python
136 lines
4.6 KiB
Python
"""
|
|
Account domain metadata enrichment.
|
|
Handles Account, AccountAddress, AccountContact, and AccountPunchlist entities.
|
|
"""
|
|
from typing import Dict, Any, Set
|
|
|
|
from asgiref.sync import sync_to_async
|
|
|
|
from core.models.enums import EventTypeChoices
|
|
|
|
|
|
ACCOUNT_EVENTS: Set[str] = {
|
|
EventTypeChoices.ACCOUNT_CREATED,
|
|
EventTypeChoices.ACCOUNT_UPDATED,
|
|
EventTypeChoices.ACCOUNT_DELETED,
|
|
EventTypeChoices.ACCOUNT_STATUS_CHANGED,
|
|
EventTypeChoices.ACCOUNT_ADDRESS_CREATED,
|
|
EventTypeChoices.ACCOUNT_ADDRESS_UPDATED,
|
|
EventTypeChoices.ACCOUNT_ADDRESS_DELETED,
|
|
EventTypeChoices.ACCOUNT_CONTACT_CREATED,
|
|
EventTypeChoices.ACCOUNT_CONTACT_UPDATED,
|
|
EventTypeChoices.ACCOUNT_CONTACT_DELETED,
|
|
EventTypeChoices.ACCOUNT_PUNCHLIST_CREATED,
|
|
EventTypeChoices.ACCOUNT_PUNCHLIST_UPDATED,
|
|
EventTypeChoices.ACCOUNT_PUNCHLIST_DELETED,
|
|
}
|
|
|
|
|
|
async def enrich_account_domain(entity_type: str, entity_id: str) -> Dict[str, Any]:
|
|
"""Enrich metadata for account-domain events."""
|
|
if entity_type == 'Account':
|
|
return await _load_account_metadata(entity_id)
|
|
elif entity_type == 'AccountAddress':
|
|
return await _load_account_address_metadata(entity_id)
|
|
elif entity_type == 'AccountContact':
|
|
return await _load_account_contact_metadata(entity_id)
|
|
elif entity_type == 'AccountPunchlist':
|
|
return await _load_account_punchlist_metadata(entity_id)
|
|
return {}
|
|
|
|
|
|
async def _load_account_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from an Account entity."""
|
|
from core.models.account import Account
|
|
|
|
def _load():
|
|
try:
|
|
account = Account.objects.select_related('customer').get(pk=entity_id)
|
|
metadata = {'account_name': account.name or ''}
|
|
if account.customer:
|
|
metadata['customer_name'] = account.customer.name or ''
|
|
return metadata
|
|
except Account.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|
|
|
|
|
|
async def _load_account_address_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from an AccountAddress entity."""
|
|
from core.models.account import AccountAddress
|
|
|
|
def _load():
|
|
try:
|
|
address = AccountAddress.objects.select_related(
|
|
'account',
|
|
'account__customer',
|
|
).get(pk=entity_id)
|
|
|
|
metadata = {'account_id': str(address.account_id)}
|
|
if address.account:
|
|
metadata['account_name'] = address.account.name or ''
|
|
if address.account.customer:
|
|
metadata['customer_name'] = address.account.customer.name or ''
|
|
|
|
# Address
|
|
address_parts = []
|
|
if address.street_address:
|
|
address_parts.append(address.street_address)
|
|
if address.city:
|
|
address_parts.append(address.city)
|
|
if address_parts:
|
|
metadata['address'] = ', '.join(address_parts)
|
|
|
|
return metadata
|
|
except AccountAddress.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|
|
|
|
|
|
async def _load_account_contact_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from an AccountContact entity."""
|
|
from core.models.account import AccountContact
|
|
|
|
def _load():
|
|
try:
|
|
contact = AccountContact.objects.select_related(
|
|
'account',
|
|
'account__customer',
|
|
).get(pk=entity_id)
|
|
|
|
metadata = {'account_id': str(contact.account_id)}
|
|
if contact.account:
|
|
metadata['account_name'] = contact.account.name or ''
|
|
if contact.account.customer:
|
|
metadata['customer_name'] = contact.account.customer.name or ''
|
|
return metadata
|
|
except AccountContact.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|
|
|
|
|
|
async def _load_account_punchlist_metadata(entity_id: str) -> Dict[str, Any]:
|
|
"""Load metadata from an AccountPunchlist entity."""
|
|
from core.models.account_punchlist import AccountPunchlist
|
|
|
|
def _load():
|
|
try:
|
|
punchlist = AccountPunchlist.objects.select_related(
|
|
'account',
|
|
'account__customer',
|
|
).get(pk=entity_id)
|
|
|
|
metadata = {'account_id': str(punchlist.account_id)}
|
|
if punchlist.account:
|
|
metadata['account_name'] = punchlist.account.name or ''
|
|
if punchlist.account.customer:
|
|
metadata['customer_name'] = punchlist.account.customer.name or ''
|
|
return metadata
|
|
except AccountPunchlist.DoesNotExist:
|
|
return {}
|
|
|
|
return await sync_to_async(_load)()
|