143 lines
4.4 KiB
Python
143 lines
4.4 KiB
Python
import io
|
|
import json
|
|
import os
|
|
from google.oauth2 import service_account
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
|
|
from api.models import Projects
|
|
|
|
|
|
def get_drive_credentials():
|
|
"""Get Google Drive credentials from environment."""
|
|
key = os.environ.get('SERVICE_ACCOUNT_KEY')
|
|
if not key:
|
|
key_file_path = os.environ.get('SERVICE_ACCOUNT_KEY_FILE', '/app/service_account_key.json')
|
|
if os.path.exists(key_file_path):
|
|
with open(key_file_path, 'r') as f:
|
|
return json.load(f)
|
|
return None
|
|
|
|
return json.loads(key) if isinstance(key, str) else key
|
|
|
|
|
|
def duplicate_punchlist(new_name):
|
|
"""
|
|
Duplicates a punchlist template from Google Drive.
|
|
|
|
Arguments:
|
|
new_name: str : New filename for the punchlist
|
|
|
|
Returns:
|
|
dict : Google Drive file object for the new punchlist
|
|
"""
|
|
scopes = ['https://www.googleapis.com/auth/drive']
|
|
key = get_drive_credentials()
|
|
impersonator = os.environ.get('DISPATCH_EMAIL', 'dispatch@example.com')
|
|
|
|
creds = service_account.Credentials.from_service_account_info(
|
|
key, scopes=scopes, subject=impersonator
|
|
)
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
|
# Get folder and template IDs from environment
|
|
folder_id = os.environ.get('PUNCHLIST_FOLDER_ID')
|
|
template_id = os.environ.get('PUNCHLIST_TEMPLATE_ID')
|
|
|
|
request_body = {
|
|
'name': new_name,
|
|
'parents': [folder_id] if folder_id else [],
|
|
}
|
|
|
|
return service.files().copy(fileId=template_id, body=request_body).execute()
|
|
|
|
|
|
def create_pdf_from_punchlist(sheet_id):
|
|
"""
|
|
Creates a PDF from a Google Sheet.
|
|
|
|
Args:
|
|
sheet_id: str : The ID of the Google Sheet
|
|
|
|
Returns:
|
|
str : The file ID of the created PDF, or None on error
|
|
"""
|
|
try:
|
|
scopes = ['https://www.googleapis.com/auth/drive']
|
|
key = get_drive_credentials()
|
|
impersonator = os.environ.get('DISPATCH_EMAIL', 'dispatch@example.com')
|
|
|
|
creds = service_account.Credentials.from_service_account_info(
|
|
key, scopes=scopes, subject=impersonator
|
|
)
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
|
# Get the original sheet's metadata
|
|
file_metadata = service.files().get(fileId=sheet_id, fields='name,parents').execute()
|
|
sheet_name = file_metadata.get('name')
|
|
parent_folder_id = file_metadata.get('parents', [None])[0]
|
|
|
|
pdf_filename = f"{sheet_name}.pdf"
|
|
|
|
# Export as PDF
|
|
request = service.files().export_media(fileId=sheet_id, mimeType='application/pdf')
|
|
fh = io.BytesIO()
|
|
downloader = MediaIoBaseDownload(fh, request)
|
|
done = False
|
|
while not done:
|
|
status, done = downloader.next_chunk()
|
|
|
|
# Upload PDF to same folder
|
|
file_metadata = {
|
|
'name': pdf_filename,
|
|
'mimeType': 'application/pdf',
|
|
}
|
|
if parent_folder_id:
|
|
file_metadata['parents'] = [parent_folder_id]
|
|
|
|
fh.seek(0)
|
|
media = MediaIoBaseUpload(fh, mimetype='application/pdf')
|
|
file = service.files().create(
|
|
body=file_metadata, media_body=media, fields='id'
|
|
).execute()
|
|
|
|
return file.get('id')
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
return None
|
|
|
|
|
|
def store_pdf_as_bytecode(pdf_id, proj_id):
|
|
"""
|
|
Retrieves a PDF from Google Drive and stores it in the database.
|
|
|
|
Args:
|
|
pdf_id: str : The ID of the PDF file in Google Drive
|
|
proj_id: str : The project ID in the database
|
|
"""
|
|
try:
|
|
scopes = ['https://www.googleapis.com/auth/drive']
|
|
key = get_drive_credentials()
|
|
impersonator = os.environ.get('DISPATCH_EMAIL', 'dispatch@example.com')
|
|
|
|
creds = service_account.Credentials.from_service_account_info(
|
|
key, scopes=scopes, subject=impersonator
|
|
)
|
|
service = build('drive', 'v3', credentials=creds)
|
|
|
|
request = service.files().get_media(fileId=pdf_id)
|
|
fh = io.BytesIO()
|
|
downloader = MediaIoBaseDownload(fh, request)
|
|
done = False
|
|
while not done:
|
|
status, done = downloader.next_chunk()
|
|
|
|
pdf_bytes = fh.getvalue()
|
|
|
|
project = Projects.objects.get(id=proj_id)
|
|
project.punchlist = pdf_bytes
|
|
project.save()
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|