2026-01-26 10:30:49 -05:00

178 lines
4.8 KiB
Python

"""
Repository for Customer model operations.
"""
from typing import Optional
from django.db.models import Q, QuerySet
from django.utils import timezone
from backend.core.repositories.base import BaseRepository
from backend.core.models import Customer
class CustomerRepository(BaseRepository[Customer]):
"""
Repository for Customer model operations.
"""
model = Customer
@classmethod
def get_active(cls):
"""
Get all active customers.
Returns:
QuerySet of active customers
"""
return Customer.objects.filter(
Q(end_date__isnull=True) | Q(end_date__gt=timezone.now().date())
)
@classmethod
def get_inactive(cls):
"""
Get all inactive customers.
Returns:
QuerySet of inactive customers
"""
return Customer.objects.filter(
end_date__isnull=False,
end_date__lte=timezone.now().date()
)
@classmethod
def mark_inactive(cls, customer_id: str) -> Optional[Customer]:
"""
Mark a customer as inactive.
Args:
customer_id: The customer ID
Returns:
The updated customer or None if not found
"""
return cls.update(customer_id, {'end_date': timezone.now().date()})
@classmethod
def search(cls, search_term: str, include_inactive: bool = False):
"""
Search customers by name or contact info.
Args:
search_term: The search term
include_inactive: Whether to include inactive customers
Returns:
QuerySet of matching customers
"""
queryset = super().search(
search_term,
[
'name',
'primary_contact_first_name',
'primary_contact_last_name',
'primary_contact_email',
'secondary_contact_first_name',
'secondary_contact_last_name',
'secondary_contact_email',
'billing_contact_first_name',
'billing_contact_last_name',
'billing_email'
]
)
if not include_inactive:
queryset = queryset.filter(
Q(end_date__isnull=True) | Q(end_date__gt=timezone.now().date())
)
return queryset
@classmethod
def get_by_email(cls, email: str) -> Optional[Customer]:
"""
Get a customer by email.
Args:
email: The email address
Returns:
The customer or None if not found
"""
return Customer.objects.filter(
Q(primary_contact_email=email) |
Q(secondary_contact_email=email) |
Q(billing_email=email)
).first()
@classmethod
def get_with_accounts(cls, customer_id: str) -> Optional[Customer]:
"""
Get a customer with prefetched accounts.
Args:
customer_id: The customer ID
Returns:
The customer with accounts or None if not found
"""
try:
return Customer.objects.prefetch_related('accounts').get(pk=customer_id)
except Customer.DoesNotExist:
return None
@classmethod
def get_with_projects(cls, customer_id: str) -> Optional[Customer]:
"""
Get a customer with prefetched projects.
Args:
customer_id: The customer ID
Returns:
The customer with projects or None if not found
"""
try:
return Customer.objects.prefetch_related('projects').get(pk=customer_id)
except Customer.DoesNotExist:
return None
@classmethod
def get_with_invoices(cls, customer_id: str) -> Optional[Customer]:
"""
Get a customer with prefetched invoices.
Args:
customer_id: The customer ID
Returns:
The customer with invoices or None if not found
"""
try:
return Customer.objects.prefetch_related('invoices').get(pk=customer_id)
except Customer.DoesNotExist:
return None
@classmethod
def filter_customers(cls, name=None, city=None, state=None, start_date=None, end_date=None) -> QuerySet[Customer]:
"""
Filter customers by multiple criteria.
"""
queryset = Customer.objects.all()
if name:
queryset = queryset.filter(name__icontains=name)
if city:
queryset = queryset.filter(billing_city__icontains=city)
if state:
queryset = queryset.filter(billing_state__iexact=state)
if start_date:
queryset = queryset.filter(start_date__gte=start_date)
if end_date:
queryset = queryset.filter(end_date__lte=end_date)
return queryset