from typing import Optional, cast import strawberry from strawberry import Info from strawberry.file_uploads import Upload from strawberry.relay import GlobalID from channels.db import database_sync_to_async from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from core.models.session import ServiceSession, ProjectSession from core.models.session_video import ServiceSessionVideo, ProjectSessionVideo from core.graphql.types.session_video import ( ServiceSessionVideoType, ProjectSessionVideoType, ) from core.services.video import ( verify_video_bytes, extract_video_metadata, generate_video_thumbnail, ) from core.graphql.utils import update_object, delete_object, _decode_global_id from core.graphql.inputs.session_video import ( ServiceSessionVideoUpdateInput, ProjectSessionVideoUpdateInput, ) from core.services.events import ( publish_session_video_uploaded, publish_session_video_updated, publish_session_video_deleted, publish_session_media_internal_flagged, ) @strawberry.type class Mutation: @strawberry.mutation(description="Upload a video to a ServiceSession") async def upload_service_session_video( self, info: Info, session_id: GlobalID, file: Upload, title: Optional[str] = None, notes: Optional[str] = None, internal: bool = True, ) -> ServiceSessionVideoType: """ Upload a video file to a ServiceSession. Accepts video formats: MP4, MOV, WebM, AVI, MKV Maximum file size: 250 MB """ req_profile = getattr(info.context.request, "profile", None) if not req_profile: raise ValidationError("Authentication required.") if not file or not getattr(file, "filename", None): raise ValidationError("No file provided.") filename: str = file.filename data = await file.read() if not data: raise ValidationError("Empty file upload.") # Validate video file and get content type content_type = verify_video_bytes(data, filename) sess_pk = _decode_global_id(session_id) def _create_video_sync() -> ServiceSessionVideo: from django.core.files import File import tempfile import os sess = ServiceSession.objects.get(pk=sess_pk) # Write video to temp file for ffmpeg processing (required for S3 storage) video_ext = os.path.splitext(filename)[1] or '.mp4' video_fd, video_tmp_path = tempfile.mkstemp(suffix=video_ext) thumb_fd, thumb_tmp_path = tempfile.mkstemp(suffix='.jpg') try: # Write video bytes to temp file os.write(video_fd, data) os.close(video_fd) os.close(thumb_fd) # Extract metadata from temp file (before saving to S3) metadata = extract_video_metadata(video_tmp_path) # Generate thumbnail from temp file thumbnail_generated = generate_video_thumbnail(video_tmp_path, thumb_tmp_path, timestamp=1.0) video = ServiceSessionVideo( title=title or "", notes=notes or "", service_session=sess, uploaded_by_team_profile=req_profile, content_type=content_type, internal=internal, ) # Set metadata before saving if metadata: video.width, video.height, video.duration_seconds = metadata # Save video to storage (S3 or local) video.video.save(filename, ContentFile(data), save=True) # Save thumbnail if generated if thumbnail_generated and os.path.exists(thumb_tmp_path): with open(thumb_tmp_path, 'rb') as thumb_file: video.thumbnail.save( f'thumb_{video.id}.jpg', File(thumb_file), save=False ) video.save() return video finally: # Clean up temp files if os.path.exists(video_tmp_path): os.unlink(video_tmp_path) if os.path.exists(thumb_tmp_path): os.unlink(thumb_tmp_path) instance: ServiceSessionVideo = await database_sync_to_async(_create_video_sync)() # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_uploaded( video_id=str(instance.id), session_id=str(instance.service_session_id), is_internal=internal, triggered_by=profile ) # If marked as internal, also publish internal flag event if internal: await publish_session_media_internal_flagged( media_id=str(instance.id), media_type='SessionVideo', session_id=str(instance.service_session_id), triggered_by=profile ) return cast(ServiceSessionVideoType, instance) @strawberry.mutation(description="Upload a video to a ProjectSession") async def upload_project_session_video( self, info: Info, session_id: GlobalID, file: Upload, title: Optional[str] = None, notes: Optional[str] = None, internal: bool = True, ) -> ProjectSessionVideoType: """ Upload a video file to a ProjectSession. Accepts video formats: MP4, MOV, WebM, AVI, MKV Maximum file size: 250 MB """ req_profile = getattr(info.context.request, "profile", None) if not req_profile: raise ValidationError("Authentication required.") if not file or not getattr(file, "filename", None): raise ValidationError("No file provided.") filename: str = file.filename data = await file.read() if not data: raise ValidationError("Empty file upload.") # Validate video file and get content type content_type = verify_video_bytes(data, filename) sess_pk = _decode_global_id(session_id) def _create_video_sync() -> ProjectSessionVideo: from django.core.files import File import tempfile import os sess = ProjectSession.objects.get(pk=sess_pk) # Write video to temp file for ffmpeg processing (required for S3 storage) video_ext = os.path.splitext(filename)[1] or '.mp4' video_fd, video_tmp_path = tempfile.mkstemp(suffix=video_ext) thumb_fd, thumb_tmp_path = tempfile.mkstemp(suffix='.jpg') try: # Write video bytes to temp file os.write(video_fd, data) os.close(video_fd) os.close(thumb_fd) # Extract metadata from temp file (before saving to S3) metadata = extract_video_metadata(video_tmp_path) # Generate thumbnail from temp file thumbnail_generated = generate_video_thumbnail(video_tmp_path, thumb_tmp_path, timestamp=1.0) video = ProjectSessionVideo( title=title or "", notes=notes or "", project_session=sess, uploaded_by_team_profile=req_profile, content_type=content_type, internal=internal, ) # Set metadata before saving if metadata: video.width, video.height, video.duration_seconds = metadata # Save video to storage (S3 or local) video.video.save(filename, ContentFile(data), save=True) # Save thumbnail if generated if thumbnail_generated and os.path.exists(thumb_tmp_path): with open(thumb_tmp_path, 'rb') as thumb_file: video.thumbnail.save( f'thumb_{video.id}.jpg', File(thumb_file), save=False ) video.save() return video finally: # Clean up temp files if os.path.exists(video_tmp_path): os.unlink(video_tmp_path) if os.path.exists(thumb_tmp_path): os.unlink(thumb_tmp_path) instance: ProjectSessionVideo = await database_sync_to_async(_create_video_sync)() # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_uploaded( video_id=str(instance.id), session_id=str(instance.project_session_id), is_internal=internal, triggered_by=profile ) # If marked as internal, also publish internal flag event if internal: await publish_session_media_internal_flagged( media_id=str(instance.id), media_type='SessionVideo', session_id=str(instance.project_session_id), triggered_by=profile ) return cast(ProjectSessionVideoType, instance) @strawberry.mutation(description="Update an existing ServiceSession video (e.g., title)") async def update_service_session_video( self, info: Info, input: ServiceSessionVideoUpdateInput ) -> ServiceSessionVideoType: payload = {"id": input.id, "title": input.title, "notes": input.notes, "internal": input.internal} instance = await update_object(payload, ServiceSessionVideo) # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_updated( video_id=str(instance.id), session_id=str(instance.service_session_id), triggered_by=profile ) return cast(ServiceSessionVideoType, instance) @strawberry.mutation(description="Update an existing ProjectSession video (e.g., title)") async def update_project_session_video( self, info: Info, input: ProjectSessionVideoUpdateInput ) -> ProjectSessionVideoType: payload = {"id": input.id, "title": input.title, "notes": input.notes, "internal": input.internal} instance = await update_object(payload, ProjectSessionVideo) # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_updated( video_id=str(instance.id), session_id=str(instance.project_session_id), triggered_by=profile ) return cast(ProjectSessionVideoType, instance) @strawberry.mutation(description="Delete a ServiceSession video") async def delete_service_session_video(self, info: Info, id: strawberry.ID) -> strawberry.ID: """Delete a video from a ServiceSession.""" # Delete the instance (delete_object returns the instance before deletion) instance = await delete_object(id, ServiceSessionVideo) if not instance: raise ValueError(f"ServiceSessionVideo with ID {id} does not exist") # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_deleted( video_id=str(instance.id), session_id=str(instance.service_session_id), triggered_by=profile ) return id @strawberry.mutation(description="Delete a ProjectSession video") async def delete_project_session_video(self, info: Info, id: strawberry.ID) -> strawberry.ID: """Delete a video from a ProjectSession.""" # Delete the instance (delete_object returns the instance before deletion) instance = await delete_object(id, ProjectSessionVideo) if not instance: raise ValueError(f"ProjectSessionVideo with ID {id} does not exist") # Publish events profile = getattr(info.context.request, 'profile', None) await publish_session_video_deleted( video_id=str(instance.id), session_id=str(instance.project_session_id), triggered_by=profile ) return id