nexus-2/backend/api/views.py
2026-01-26 10:12:01 -05:00

604 lines
22 KiB
Python

import calendar
import json
import os
from datetime import datetime, timedelta
from django.contrib.auth import update_session_auth_hash
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from rest_framework import viewsets, permissions, status
from rest_framework.decorators import api_view, permission_classes, action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from .google import create_event, export_punchlist_to_sheet
from .models import (
Profile, Customer, Account, Revenue, Labor,
Schedule, Service, Project, Invoice, Report, Punchlist
)
from .serializers import (
UserSerializer, ProfileSerializer, CustomerSerializer,
AccountSerializer, RevenueSerializer, LaborSerializer,
ScheduleSerializer, ServiceSerializer, ProjectSerializer,
InvoiceSerializer, ReportSerializer, PunchlistSerializer
)
# Users API
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [permissions.IsAuthenticated]
# Profiles API
class ProfileViewSet(viewsets.ModelViewSet):
queryset = Profile.objects.all()
serializer_class = ProfileSerializer
permission_classes = [permissions.IsAuthenticated]
# Customers API
class CustomerViewSet(viewsets.ModelViewSet):
queryset = Customer.objects.all()
serializer_class = CustomerSerializer
permission_classes = [permissions.IsAuthenticated]
# Accounts API
class AccountViewSet(viewsets.ModelViewSet):
queryset = Account.objects.all()
serializer_class = AccountSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Account.objects.all()
customer_id = self.request.query_params.get('customer_id')
if customer_id is not None:
queryset = queryset.filter(customer_id=customer_id)
return queryset
# Revenues API
class RevenueViewSet(viewsets.ModelViewSet):
queryset = Revenue.objects.all()
serializer_class = RevenueSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Revenue.objects.all()
account_id = self.request.query_params.get('account_id')
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
return queryset
# Labor API
class LaborViewSet(viewsets.ModelViewSet):
queryset = Labor.objects.all()
serializer_class = LaborSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Labor.objects.all()
account_id = self.request.query_params.get('account_id')
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
return queryset
# Schedules API
class ScheduleViewSet(viewsets.ModelViewSet):
queryset = Schedule.objects.all()
serializer_class = ScheduleSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Schedule.objects.all()
account_id = self.request.query_params.get('account_id')
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
return queryset
# Services API
class ServiceViewSet(viewsets.ModelViewSet):
queryset = Service.objects.all()
serializer_class = ServiceSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Service.objects.all()
# Filter by account
account_id = self.request.query_params.get('account_id')
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
# Filter by team member
team_member_id = self.request.query_params.get('team_member_id')
if team_member_id is not None:
queryset = queryset.filter(team_members__id=team_member_id)
return queryset
# Projects API
class ProjectViewSet(viewsets.ModelViewSet):
queryset = Project.objects.all()
serializer_class = ProjectSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Project.objects.all()
customer_id = self.request.query_params.get('customer_id')
account_id = self.request.query_params.get('account_id')
if customer_id is not None:
queryset = queryset.filter(customer_id=customer_id)
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
return queryset
# Invoices API
class InvoiceViewSet(viewsets.ModelViewSet):
queryset = Invoice.objects.all()
serializer_class = InvoiceSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Invoice.objects.all()
customer_id = self.request.query_params.get('customer_id')
if customer_id is not None:
queryset = queryset.filter(customer_id=customer_id)
return queryset
@action(detail=True, methods=['post'])
def mark_as_paid(self, request, pk=None):
"""
Mark an invoice as paid with payment details
"""
invoice = self.get_object()
# Update invoice with payment information
invoice.status = 'paid'
# Set payment date if provided, otherwise use today
date_paid = request.data.get('date_paid')
if date_paid:
invoice.date_paid = date_paid
else:
invoice.date_paid = timezone.now().date()
# Set payment type if provided
payment_type = request.data.get('payment_type')
if payment_type:
invoice.payment_type = payment_type
invoice.save()
# Return the updated invoice
serializer = self.get_serializer(invoice)
return Response(serializer.data)
# Reports API
class ReportViewSet(viewsets.ModelViewSet):
queryset = Report.objects.all()
serializer_class = ReportSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Report.objects.all()
team_member_id = self.request.query_params.get('team_member_id')
if team_member_id is not None:
queryset = queryset.filter(team_member_id=team_member_id)
return queryset
# User Profile API
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def current_user(request):
"""
Endpoint to get the current authenticated user and their profile
"""
user = request.user
user_serializer = UserSerializer(user)
# Get the profile associated with the user
try:
profile = Profile.objects.get(user=user)
profile_serializer = ProfileSerializer(profile)
# Return combined data
return Response({
'user': user_serializer.data,
'profile': profile_serializer.data
})
except ObjectDoesNotExist:
# Return just the user data if no profile exists
return Response({
'user': user_serializer.data,
'profile': None
})
# Punchlist API
class PunchlistViewSet(viewsets.ModelViewSet):
queryset = Punchlist.objects.all()
serializer_class = PunchlistSerializer
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
queryset = Punchlist.objects.all()
project_id = self.request.query_params.get('project_id')
account_id = self.request.query_params.get('account_id')
if project_id is not None:
queryset = queryset.filter(project_id=project_id)
if account_id is not None:
queryset = queryset.filter(account_id=account_id)
return queryset
def create(self, request, *args, **kwargs):
"""
Create a new punchlist
"""
# Extract project_id from request data
project_id = request.data.get('project_id')
if not project_id:
return Response(
{"error": "project_id is required"},
status=status.HTTP_400_BAD_REQUEST
)
# Verify project exists
try:
project = Project.objects.get(id=project_id)
except Project.DoesNotExist:
return Response(
{"error": f"Project with id {project_id} does not exist"},
status=status.HTTP_404_NOT_FOUND
)
# Extract account_id from request data
account_id = request.data.get('account_id')
if not account_id:
return Response(
{"error": "account_id is required"},
status=status.HTTP_400_BAD_REQUEST
)
# Verify account exists
try:
account = Account.objects.get(id=account_id)
except Account.DoesNotExist:
return Response(
{"error": f"Account with id {account_id} does not exist"},
status=status.HTTP_404_NOT_FOUND
)
# Create a modified data dictionary with project and account fields
modified_data = request.data.copy()
modified_data['project'] = project_id # Map project_id to project
modified_data['account'] = account_id # Map account_id to account
# Create serializer with modified data
serializer = self.get_serializer(data=modified_data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data,
status=status.HTTP_201_CREATED,
headers=headers
)
# Change Password API
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def change_password(request):
"""
Endpoint to change user password
"""
user = request.user
# Check if required fields are present
if not all(k in request.data for k in ('current_password', 'new_password')):
return Response(
{'error': 'Current password and new password are required'},
status=status.HTTP_400_BAD_REQUEST
)
# Verify current password
if not user.check_password(request.data['current_password']):
return Response(
{'error': 'Current password is incorrect'},
status=status.HTTP_400_BAD_REQUEST
)
# Set new password
user.set_password(request.data['new_password'])
user.save()
# Update session to prevent logout
update_session_auth_hash(request, user)
return Response({'message': 'Password changed successfully'}, status=status.HTTP_200_OK)
# Reset Password API
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def reset_password(request):
"""
Endpoint for admins to reset a user's password
"""
# Check if user is admin
try:
profile = Profile.objects.get(user=request.user)
if profile.role != 'admin':
return Response(
{"error": "Only administrators can reset passwords"},
status=status.HTTP_403_FORBIDDEN
)
except ObjectDoesNotExist:
return Response(
{"error": "User profile not found"},
status=status.HTTP_404_NOT_FOUND
)
# Get parameters
user_id = request.data.get('user_id')
new_password = request.data.get('new_password')
if not user_id or not new_password:
return Response(
{"error": "Missing required parameters"},
status=status.HTTP_400_BAD_REQUEST
)
# Find the target profile and user
try:
target_profile = Profile.objects.get(id=user_id)
target_user = target_profile.user
# Set new password
target_user.set_password(new_password)
target_user.save()
return Response({"message": "Password reset successful"}, status=status.HTTP_200_OK)
except ObjectDoesNotExist:
return Response(
{"error": "Target user profile not found"},
status=status.HTTP_404_NOT_FOUND
)
except Exception as e:
return Response(
{"error": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Generate Services API
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def generate_services(request):
"""
Batch generate services for a given month based on active schedules
"""
try:
# Expected data from request
month = int(request.data.get('month'))
year = int(request.data.get('year'))
account_ids = request.data.get('account_ids', None)
# Fixed holiday dates for 2025
holidays = [
datetime(2025, 1, 1).date(), # New Year's Day
datetime(2025, 1, 20).date(), # MLK Day
datetime(2025, 2, 17).date(), # Presidents' Day
datetime(2025, 5, 26).date(), # Memorial Day
datetime(2025, 7, 4).date(), # Independence Day
datetime(2025, 9, 1).date(), # Labor Day
datetime(2025, 11, 27).date(), # Thanksgiving
datetime(2025, 12, 25).date(), # Christmas
]
# Validate month
if month < 0 or month > 11:
return Response(
{'error': 'Month must be between 0 (January) and 11 (December)'},
status=status.HTTP_400_BAD_REQUEST
)
except (ValueError, TypeError):
return Response(
{'error': 'Invalid parameters. Month and year must be integers.'},
status=status.HTTP_400_BAD_REQUEST
)
# Get calendar for specified month
cal = calendar.monthcalendar(year, (month + 1))
# Get first and last day of month (for filtering schedules)
first_day = datetime(year, (month + 1), 1).date()
last_day = datetime(year, (month + 1), calendar.monthrange(year, (month + 1))[1]).date()
# Get active schedules for the month
schedules_query = Schedule.objects.filter(
start_date__lte=last_day
).filter(
Q(end_date__isnull=True) |
Q(end_date__gte=first_day)
)
# Filter by account IDs if provided
if account_ids:
schedules_query = schedules_query.filter(account__id__in=account_ids)
# Initialize service count
services_created = 0
with transaction.atomic():
for schedule in schedules_query:
# Use the monthcalendar to iterate through days
for week in cal:
for weekday, day in enumerate(week):
if day == 0:
continue
# Iterated date object
iter_date = datetime(year, month + 1, day).date()
# Check if date is in schedule's range
if not (schedule.start_date <= iter_date and (
schedule.end_date is None or schedule.end_date >= iter_date)):
continue
# Check if date is a holiday
if iter_date in holidays:
continue
# Initial service requirement
service_needed = False
# Check regular weekday service requirement
if weekday == 0 and schedule.monday_service:
service_needed = True
elif weekday == 1 and schedule.tuesday_service:
service_needed = True
elif weekday == 2 and schedule.wednesday_service:
service_needed = True
elif weekday == 3 and schedule.thursday_service:
service_needed = True
elif weekday == 4 and schedule.friday_service:
service_needed = True
elif weekday == 5 and schedule.saturday_service:
service_needed = True
elif weekday == 6 and schedule.sunday_service:
service_needed = True
# Separate check for weekend service
if (schedule.weekend_service and weekday in [4]) and not schedule.friday_service:
service_needed = True
# Create service if needed and doesn't already exist
if service_needed:
# Check for existing service
existing_service = Service.objects.filter(
account=schedule.account,
date=iter_date
).exists()
# If service does not exist
if not existing_service:
# Service starts at 6:00PM
deadline_start = datetime.combine(
iter_date,
datetime.min.time()
).replace(hour=18)
# Service ends at 6:00AM
deadline_end = datetime.combine(
iter_date + timedelta(days=1),
datetime.min.time()
).replace(hour=6)
# Prepare notes
notes = ""
if schedule.schedule_exception:
notes += f"Schedule exception: {schedule.schedule_exception}"
if schedule.weekend_service and weekday in [4]:
notes += " Weekend Service"
# Create the service
Service.objects.create(
account=schedule.account,
date=iter_date,
status='scheduled',
notes=notes.strip(),
deadline_start=deadline_start,
deadline_end=deadline_end
)
# Add to the counter
services_created += 1
# Return a summary of the operation
return Response({
'message': f'Successfully generated {services_created} services for {calendar.month_name[month + 1]} {year}.',
'count': services_created,
'month': month,
'year': year,
'accountsProcessed': schedules_query.count()
})
key_file_path = os.path.join(settings.BASE_DIR, 'google-sa.json')
# Google Calendar Event API
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def create_calendar_event(request):
"""
Create a Google Calendar event for a project or service by reading key from file
"""
try:
try:
with open(key_file_path, 'r') as f:
google_service_account_key = json.load(f)
except FileNotFoundError:
return Response(
{"error": "Google service account key file not found"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except json.JSONDecodeError as e:
return Response(
{"error": "Invalid Google service account key file format", "details": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
result = create_event(google_service_account_key, request.data)
if "error" in result:
return Response(result, status=status.HTTP_400_BAD_REQUEST)
return Response(result, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{"error": "Failed to create calendar event", "details": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def export_punchlist(request):
"""
Export a punchlist to Google Sheets and generate a PDF
"""
try:
# Validate required parameters
punchlist_id = request.data.get('punchlistId')
if not punchlist_id:
return Response(
{"error": "punchlistId is required"},
status=status.HTTP_400_BAD_REQUEST
)
# Get optional template ID
template_id = request.data.get('templateId')
# Load Google service account key
try:
with open(key_file_path, 'r') as f:
google_service_account_key = json.load(f)
except FileNotFoundError:
return Response(
{"error": "Google service account key file not found"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
except json.JSONDecodeError as e:
return Response(
{"error": "Invalid Google service account key file format", "details": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
# Call the export function with the service account key
from .google import export_punchlist_to_sheet
result = export_punchlist_to_sheet(google_service_account_key, punchlist_id, template_id)
# Handle errors from the export function
if not result.get('success', False):
return Response(
{"error": result.get('message', 'Unknown error during export')},
status=status.HTTP_400_BAD_REQUEST
)
# Return success response with URLs
return Response({
"success": True,
"message": "Punchlist exported successfully",
"sheetUrl": result.get('sheetUrl'),
"pdfUrl": result.get('pdfUrl'),
"alreadyExported": result.get('already_exported', False)
}, status=status.HTTP_200_OK)
except Exception as e:
# Handle any unexpected errors
return Response(
{"error": "Failed to export punchlist", "details": str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)