Fix HTML rendering for service descriptions, allow zero price for services, improve image_url handling

This commit is contained in:
2026-02-01 22:31:00 -06:00
parent d3cad0e5fa
commit 72f17c8be9
32 changed files with 6958 additions and 414 deletions

View File

@@ -9,6 +9,7 @@ from sqlalchemy import select, func, and_, or_, desc, asc, distinct, delete
from sqlalchemy.orm import selectinload
import os
import logging
import aiofiles
from pathlib import Path
from pydantic import BaseModel, Field, EmailStr, ConfigDict
from typing import List, Optional, Dict, Any
@@ -20,6 +21,10 @@ import io
import csv
import base64
import shutil
import smtplib
import httpx
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from contextlib import asynccontextmanager
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
@@ -32,7 +37,7 @@ from models import (
User, Product, Service, CartItem, Order, OrderItem, OrderStatusHistory,
Review, Booking, Contact, InventoryLog, Category, SalesReport,
OrderStatus, UserRole, Base, ProductImage, ServiceImage,
AboutContent, TeamMember, CompanyValue
AboutContent, TeamMember, CompanyValue, Media, MediaType
)
ROOT_DIR = Path(__file__).parent
@@ -42,6 +47,26 @@ load_dotenv(ROOT_DIR / '.env')
UPLOAD_DIR = ROOT_DIR / 'uploads' / 'products'
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Notification settings (from environment variables)
SMTP_HOST = os.environ.get('SMTP_HOST', 'smtp.gmail.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', 587))
SMTP_USER = os.environ.get('SMTP_USER', '')
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', '')
ADMIN_EMAIL = os.environ.get('ADMIN_EMAIL', 'admin@prompttech.com')
ADMIN_PHONE = os.environ.get('ADMIN_PHONE', '+5016261234') # WhatsApp number
WHATSAPP_API_URL = os.environ.get('WHATSAPP_API_URL', '') # e.g., CallMeBot or Twilio
WHATSAPP_API_KEY = os.environ.get('WHATSAPP_API_KEY', '')
# Create media uploads directory
MEDIA_UPLOAD_DIR = ROOT_DIR / 'uploads' / 'media'
MEDIA_UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# Allowed file extensions for media
ALLOWED_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.svg', '.bmp', '.ico'}
ALLOWED_DOC_EXTENSIONS = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt'}
ALLOWED_VIDEO_EXTENSIONS = {'.mp4', '.webm', '.mov', '.avi'}
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
# JWT Configuration
SECRET_KEY = os.environ.get('JWT_SECRET', 'techzone-super-secret-key-2024-production')
ALGORITHM = "HS256"
@@ -431,9 +456,139 @@ def booking_to_dict(booking: Booking) -> dict:
"preferred_date": booking.preferred_date,
"notes": booking.notes,
"status": booking.status,
"created_at": booking.created_at.isoformat() if booking.created_at else None
"created_at": booking.created_at.isoformat() if booking.created_at else None,
"completed_at": booking.completed_at.isoformat() if booking.completed_at else None,
"diagnosis": booking.diagnosis,
"work_performed": booking.work_performed,
"technician_notes": booking.technician_notes,
"service_cost": booking.service_cost,
"paid": booking.paid,
"paid_at": booking.paid_at.isoformat() if booking.paid_at else None,
"device_model": booking.device_model,
"serial_number": booking.serial_number,
"product_number": booking.product_number,
"screen_size": booking.screen_size
}
# Booking completion model
class BookingComplete(BaseModel):
diagnosis: Optional[str] = None
work_performed: Optional[str] = None
technician_notes: Optional[str] = None
service_cost: Optional[float] = None
paid: Optional[bool] = None
device_model: Optional[str] = None
serial_number: Optional[str] = None
product_number: Optional[str] = None
screen_size: Optional[str] = None
# ================== NOTIFICATION FUNCTIONS ==================
async def send_email_notification(subject: str, body: str, to_email: str = None):
"""Send email notification to admin"""
if not SMTP_USER or not SMTP_PASSWORD:
logger.warning("Email notification skipped - SMTP not configured")
return False
try:
to_email = to_email or ADMIN_EMAIL
msg = MIMEMultipart()
msg['From'] = SMTP_USER
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'html'))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.send_message(msg)
logger.info(f"Email notification sent to {to_email}")
return True
except Exception as e:
logger.error(f"Failed to send email: {e}")
return False
async def send_whatsapp_notification(message: str, phone: str = None):
"""Send WhatsApp notification using CallMeBot API or similar service"""
if not WHATSAPP_API_URL or not WHATSAPP_API_KEY:
logger.warning("WhatsApp notification skipped - API not configured")
return False
try:
phone = phone or ADMIN_PHONE
# CallMeBot API format: https://api.callmebot.com/whatsapp.php?phone=PHONE&text=MESSAGE&apikey=KEY
async with httpx.AsyncClient() as client:
params = {
'phone': phone.replace('+', ''),
'text': message,
'apikey': WHATSAPP_API_KEY
}
response = await client.get(WHATSAPP_API_URL, params=params, timeout=10)
if response.status_code == 200:
logger.info(f"WhatsApp notification sent to {phone}")
return True
else:
logger.error(f"WhatsApp API error: {response.status_code}")
return False
except Exception as e:
logger.error(f"Failed to send WhatsApp: {e}")
return False
async def notify_booking(booking_data: dict, service_name: str):
"""Send booking notifications via email and WhatsApp"""
# Format the booking details
booking_date = booking_data.get('preferred_date', 'Not specified')
customer_name = booking_data.get('name', 'Unknown')
customer_email = booking_data.get('email', 'Not provided')
customer_phone = booking_data.get('phone', 'Not provided')
notes = booking_data.get('notes', 'None')
# Email body (HTML)
email_body = f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #2563eb;">New Service Booking!</h2>
<p>A customer has booked a service on your website.</p>
<table style="border-collapse: collapse; width: 100%; max-width: 500px;">
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Service:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{service_name}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Customer Name:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{customer_name}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Email:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{customer_email}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Phone:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{customer_phone}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Preferred Date:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{booking_date}</td></tr>
<tr><td style="padding: 8px; font-weight: bold; border-bottom: 1px solid #eee;">Notes:</td>
<td style="padding: 8px; border-bottom: 1px solid #eee;">{notes}</td></tr>
</table>
<p style="margin-top: 20px;">Please reach out to the customer as soon as possible.</p>
<p style="color: #666;">- PromptTech Solutions</p>
</body>
</html>
"""
# WhatsApp message (plain text)
whatsapp_msg = f"""🔔 NEW BOOKING!
Service: {service_name}
Customer: {customer_name}
Phone: {customer_phone}
Email: {customer_email}
Date: {booking_date}
Notes: {notes}
Please reach out to the customer."""
# Send both notifications (don't block on failures)
await send_email_notification(
subject=f"New Booking: {service_name} - {customer_name}",
body=email_body
)
await send_whatsapp_notification(whatsapp_msg)
# ================== AUTH ROUTES ==================
@api_router.post("/auth/register", response_model=TokenResponse)
@@ -586,6 +741,20 @@ async def book_service(
)
db.add(booking)
await db.commit()
# Send notifications to admin (email + WhatsApp)
try:
await notify_booking({
'name': booking_data.name,
'email': booking_data.email,
'phone': booking_data.phone,
'preferred_date': booking_data.preferred_date,
'notes': booking_data.notes
}, service.name)
except Exception as e:
logger.error(f"Failed to send booking notifications: {e}")
# Don't fail the booking if notifications fail
return {"message": "Booking created successfully", "booking_id": booking.id}
# ================== CART ROUTES ==================
@@ -1601,6 +1770,125 @@ async def admin_update_booking_status(booking_id: str, status: str, user: User =
await db.commit()
return {"message": "Booking status updated"}
@api_router.put("/admin/bookings/{booking_id}/complete")
async def admin_complete_booking(
booking_id: str,
completion_data: BookingComplete,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Mark a booking as completed with diagnosis and work performed details"""
result = await db.execute(select(Booking).where(Booking.id == booking_id))
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
booking.status = "completed"
booking.completed_at = datetime.now(timezone.utc)
if completion_data.diagnosis:
booking.diagnosis = completion_data.diagnosis
if completion_data.work_performed:
booking.work_performed = completion_data.work_performed
if completion_data.technician_notes:
booking.technician_notes = completion_data.technician_notes
if completion_data.service_cost is not None:
booking.service_cost = completion_data.service_cost
if completion_data.paid is not None:
booking.paid = completion_data.paid
if completion_data.paid:
booking.paid_at = datetime.now(timezone.utc)
if completion_data.device_model:
booking.device_model = completion_data.device_model
if completion_data.serial_number:
booking.serial_number = completion_data.serial_number
if completion_data.product_number:
booking.product_number = completion_data.product_number
if completion_data.screen_size:
booking.screen_size = completion_data.screen_size
await db.commit()
await db.refresh(booking)
logger.info(f"Booking {booking_id} marked as completed by {user.email}")
return booking_to_dict(booking)
@api_router.get("/admin/bookings/{booking_id}/receipt")
async def admin_get_booking_receipt(
booking_id: str,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Get booking receipt data for printing"""
result = await db.execute(
select(Booking)
.where(Booking.id == booking_id)
.options(selectinload(Booking.service))
)
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
# Get service details
service_price = booking.service.price if booking.service else 0
return {
**booking_to_dict(booking),
"service_base_price": service_price,
"final_cost": booking.service_cost if booking.service_cost else service_price,
"company": {
"name": "PromptTech Solutions",
"address": "Belmopan City, Cayo District, Belize",
"phone": "+501 638-6318",
"email": "prompttechbz@gmail.com"
}
}
@api_router.put("/admin/bookings/{booking_id}/update")
async def admin_update_booking(
booking_id: str,
update_data: BookingComplete,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Update booking details without marking as complete"""
result = await db.execute(select(Booking).where(Booking.id == booking_id))
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
if update_data.diagnosis is not None:
booking.diagnosis = update_data.diagnosis
if update_data.work_performed is not None:
booking.work_performed = update_data.work_performed
if update_data.technician_notes is not None:
booking.technician_notes = update_data.technician_notes
if update_data.service_cost is not None:
booking.service_cost = update_data.service_cost
await db.commit()
await db.refresh(booking)
logger.info(f"Booking {booking_id} updated by {user.email}")
return booking_to_dict(booking)
@api_router.delete("/admin/bookings/{booking_id}")
async def admin_delete_booking(
booking_id: str,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete a booking"""
result = await db.execute(select(Booking).where(Booking.id == booking_id))
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
await db.delete(booking)
await db.commit()
logger.info(f"Booking {booking_id} deleted by {user.email}")
return {"message": "Booking deleted successfully"}
# Admin - Users Management
# Admin - Reports
@@ -1642,6 +1930,7 @@ async def admin_get_sales_report(
bookings_result = await db.execute(
select(Booking)
.where(and_(Booking.created_at >= start, Booking.created_at <= end))
.options(selectinload(Booking.service))
)
bookings = bookings_result.scalars().all()
@@ -1661,7 +1950,11 @@ async def admin_get_sales_report(
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
"order_statuses": {},
"services_booked": 0,
"services_completed": 0,
"services_paid": 0,
"service_revenue": 0
}
report_data[key]["orders"] += 1
@@ -1671,7 +1964,11 @@ async def admin_get_sales_report(
status = order.status.value if order.status else "unknown"
report_data[key]["order_statuses"][status] = report_data[key]["order_statuses"].get(status, 0) + 1
# Add booking counts
# Add booking counts and service revenue
total_service_revenue = 0
total_services_completed = 0
total_services_paid = 0
for booking in bookings:
if period == "daily":
key = booking.created_at.strftime("%Y-%m-%d")
@@ -1686,10 +1983,26 @@ async def admin_get_sales_report(
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
"order_statuses": {},
"services_booked": 0,
"services_completed": 0,
"services_paid": 0,
"service_revenue": 0
}
report_data[key]["services_booked"] = report_data[key].get("services_booked", 0) + 1
if booking.status == "completed":
report_data[key]["services_completed"] = report_data[key].get("services_completed", 0) + 1
total_services_completed += 1
if booking.paid:
report_data[key]["services_paid"] = report_data[key].get("services_paid", 0) + 1
total_services_paid += 1
# Use service_cost if set, otherwise use service base price
cost = booking.service_cost if booking.service_cost else (booking.service.price if booking.service else 0)
report_data[key]["service_revenue"] = report_data[key].get("service_revenue", 0) + cost
total_service_revenue += cost
# Calculate totals
total_orders = len(orders)
@@ -1706,6 +2019,10 @@ async def admin_get_sales_report(
"total_revenue": total_revenue,
"total_products_sold": total_products,
"total_services_booked": total_bookings,
"total_services_completed": total_services_completed,
"total_services_paid": total_services_paid,
"total_service_revenue": total_service_revenue,
"combined_revenue": total_revenue + total_service_revenue,
"average_order_value": total_revenue / total_orders if total_orders > 0 else 0
},
"data": list(report_data.values())
@@ -2321,12 +2638,7 @@ async def admin_update_about_content(
db: AsyncSession = Depends(get_db)
):
"""Update about content section"""
try:
content_uuid = uuid.UUID(content_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid content ID format")
result = await db.execute(select(AboutContent).where(AboutContent.id == content_uuid))
result = await db.execute(select(AboutContent).where(AboutContent.id == content_id))
content = result.scalar_one_or_none()
if not content:
@@ -2374,12 +2686,7 @@ async def admin_delete_about_content(
db: AsyncSession = Depends(get_db)
):
"""Delete about content section"""
try:
content_uuid = uuid.UUID(content_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid content ID format")
result = await db.execute(select(AboutContent).where(AboutContent.id == content_uuid))
result = await db.execute(select(AboutContent).where(AboutContent.id == content_id))
content = result.scalar_one_or_none()
if not content:
@@ -2464,12 +2771,7 @@ async def admin_update_team_member(
db: AsyncSession = Depends(get_db)
):
"""Update team member"""
try:
member_uuid = uuid.UUID(member_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid member ID format")
result = await db.execute(select(TeamMember).where(TeamMember.id == member_uuid))
result = await db.execute(select(TeamMember).where(TeamMember.id == member_id))
member = result.scalar_one_or_none()
if not member:
@@ -2519,12 +2821,7 @@ async def admin_delete_team_member(
db: AsyncSession = Depends(get_db)
):
"""Delete team member"""
try:
member_uuid = uuid.UUID(member_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid member ID format")
result = await db.execute(select(TeamMember).where(TeamMember.id == member_uuid))
result = await db.execute(select(TeamMember).where(TeamMember.id == member_id))
member = result.scalar_one_or_none()
if not member:
@@ -2600,12 +2897,7 @@ async def admin_update_company_value(
db: AsyncSession = Depends(get_db)
):
"""Update company value"""
try:
value_uuid = uuid.UUID(value_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid value ID format")
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_uuid))
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_id))
value = result.scalar_one_or_none()
if not value:
@@ -2646,12 +2938,7 @@ async def admin_delete_company_value(
db: AsyncSession = Depends(get_db)
):
"""Delete company value"""
try:
value_uuid = uuid.UUID(value_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid value ID format")
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_uuid))
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_id))
value = result.scalar_one_or_none()
if not value:
@@ -2717,6 +3004,385 @@ async def seed_data(db: AsyncSession = Depends(get_db)):
async def root():
return {"message": "TechZone API is running", "version": "2.0.0", "status": "healthy"}
# ============================================
# MEDIA MANAGEMENT ENDPOINTS
# ============================================
def get_media_type(filename: str) -> MediaType:
"""Determine media type from file extension"""
ext = Path(filename).suffix.lower()
if ext in ALLOWED_IMAGE_EXTENSIONS:
return MediaType.IMAGE
elif ext in ALLOWED_DOC_EXTENSIONS:
return MediaType.DOCUMENT
elif ext in ALLOWED_VIDEO_EXTENSIONS:
return MediaType.VIDEO
return MediaType.OTHER
def get_image_dimensions(file_path: Path) -> tuple:
"""Get image dimensions if PIL is available"""
try:
from PIL import Image
with Image.open(file_path) as img:
return img.size
except:
return None, None
@api_router.get("/media")
async def get_all_media(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
media_type: str = Query(None),
search: str = Query(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get all media files with pagination"""
if current_user.role not in [UserRole.ADMIN, UserRole.EMPLOYEE]:
raise HTTPException(status_code=403, detail="Not authorized")
try:
query = select(Media).where(Media.is_active == True)
if media_type:
query = query.where(Media.media_type == MediaType(media_type))
if search:
query = query.where(
or_(
Media.original_filename.ilike(f"%{search}%"),
Media.title.ilike(f"%{search}%"),
Media.alt_text.ilike(f"%{search}%")
)
)
# Get total count
count_query = select(func.count(Media.id)).where(Media.is_active == True)
if media_type:
count_query = count_query.where(Media.media_type == MediaType(media_type))
if search:
count_query = count_query.where(
or_(
Media.original_filename.ilike(f"%{search}%"),
Media.title.ilike(f"%{search}%")
)
)
total_result = await db.execute(count_query)
total = total_result.scalar()
# Apply pagination
offset = (page - 1) * limit
query = query.order_by(Media.created_at.desc()).offset(offset).limit(limit)
result = await db.execute(query)
media_items = result.scalars().all()
return {
"items": [{
"id": m.id,
"filename": m.filename,
"original_filename": m.original_filename,
"file_url": m.file_url,
"file_size": m.file_size,
"mime_type": m.mime_type,
"media_type": m.media_type.value,
"alt_text": m.alt_text,
"title": m.title,
"width": m.width,
"height": m.height,
"created_at": m.created_at.isoformat() if m.created_at else None
} for m in media_items],
"total": total,
"page": page,
"limit": limit,
"pages": (total + limit - 1) // limit
}
except Exception as e:
logger.error(f"Error fetching media: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.post("/media/upload")
async def upload_media(
file: UploadFile = File(...),
alt_text: str = Form(None),
title: str = Form(None),
description: str = Form(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Upload a media file"""
if current_user.role not in [UserRole.ADMIN, UserRole.EMPLOYEE]:
raise HTTPException(status_code=403, detail="Not authorized")
try:
# Validate file
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
# Check file extension
ext = Path(file.filename).suffix.lower()
all_allowed = ALLOWED_IMAGE_EXTENSIONS | ALLOWED_DOC_EXTENSIONS | ALLOWED_VIDEO_EXTENSIONS
if ext not in all_allowed:
raise HTTPException(
status_code=400,
detail=f"File type not allowed. Allowed: {', '.join(all_allowed)}"
)
# Read file content
content = await file.read()
file_size = len(content)
# Check file size
if file_size > MAX_FILE_SIZE:
raise HTTPException(
status_code=400,
detail=f"File too large. Maximum size: {MAX_FILE_SIZE // (1024*1024)}MB"
)
# Generate unique filename
unique_filename = f"{uuid.uuid4()}{ext}"
file_path = MEDIA_UPLOAD_DIR / unique_filename
# Save file
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
# Get image dimensions if applicable
width, height = None, None
if ext in ALLOWED_IMAGE_EXTENSIONS:
width, height = get_image_dimensions(file_path)
# Determine media type
media_type = get_media_type(file.filename)
# Create media record
media = Media(
filename=unique_filename,
original_filename=file.filename,
file_path=str(file_path),
file_url=f"/uploads/media/{unique_filename}",
file_size=file_size,
mime_type=file.content_type,
media_type=media_type,
alt_text=alt_text or file.filename,
title=title or Path(file.filename).stem,
description=description,
width=width,
height=height,
uploaded_by=current_user.id
)
db.add(media)
await db.commit()
await db.refresh(media)
logger.info(f"Media uploaded: {unique_filename} by {current_user.email}")
return {
"id": media.id,
"filename": media.filename,
"original_filename": media.original_filename,
"file_url": media.file_url,
"file_size": media.file_size,
"mime_type": media.mime_type,
"media_type": media.media_type.value,
"alt_text": media.alt_text,
"title": media.title,
"width": media.width,
"height": media.height,
"created_at": media.created_at.isoformat() if media.created_at else None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading media: {e}")
raise HTTPException(status_code=500, detail=str(e))
@api_router.post("/media/upload-multiple")
async def upload_multiple_media(
files: List[UploadFile] = File(...),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Upload multiple media files at once"""
if current_user.role not in [UserRole.ADMIN, UserRole.EMPLOYEE]:
raise HTTPException(status_code=403, detail="Not authorized")
uploaded = []
errors = []
for file in files:
try:
if not file.filename:
continue
ext = Path(file.filename).suffix.lower()
all_allowed = ALLOWED_IMAGE_EXTENSIONS | ALLOWED_DOC_EXTENSIONS | ALLOWED_VIDEO_EXTENSIONS
if ext not in all_allowed:
errors.append({"filename": file.filename, "error": "File type not allowed"})
continue
content = await file.read()
file_size = len(content)
if file_size > MAX_FILE_SIZE:
errors.append({"filename": file.filename, "error": "File too large"})
continue
unique_filename = f"{uuid.uuid4()}{ext}"
file_path = MEDIA_UPLOAD_DIR / unique_filename
async with aiofiles.open(file_path, 'wb') as f:
await f.write(content)
width, height = None, None
if ext in ALLOWED_IMAGE_EXTENSIONS:
width, height = get_image_dimensions(file_path)
media_type = get_media_type(file.filename)
media = Media(
filename=unique_filename,
original_filename=file.filename,
file_path=str(file_path),
file_url=f"/uploads/media/{unique_filename}",
file_size=file_size,
mime_type=file.content_type,
media_type=media_type,
alt_text=file.filename,
title=Path(file.filename).stem,
width=width,
height=height,
uploaded_by=current_user.id
)
db.add(media)
uploaded.append({
"filename": file.filename,
"file_url": media.file_url
})
except Exception as e:
errors.append({"filename": file.filename, "error": str(e)})
await db.commit()
return {
"uploaded": uploaded,
"errors": errors,
"total_uploaded": len(uploaded),
"total_errors": len(errors)
}
@api_router.get("/media/{media_id}")
async def get_media(
media_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get a single media item"""
if current_user.role not in [UserRole.ADMIN, UserRole.EMPLOYEE]:
raise HTTPException(status_code=403, detail="Not authorized")
result = await db.execute(select(Media).where(Media.id == media_id))
media = result.scalar_one_or_none()
if not media:
raise HTTPException(status_code=404, detail="Media not found")
return {
"id": media.id,
"filename": media.filename,
"original_filename": media.original_filename,
"file_url": media.file_url,
"file_size": media.file_size,
"mime_type": media.mime_type,
"media_type": media.media_type.value,
"alt_text": media.alt_text,
"title": media.title,
"description": media.description,
"width": media.width,
"height": media.height,
"created_at": media.created_at.isoformat() if media.created_at else None
}
@api_router.put("/media/{media_id}")
async def update_media(
media_id: str,
alt_text: str = Form(None),
title: str = Form(None),
description: str = Form(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update media metadata"""
if current_user.role not in [UserRole.ADMIN, UserRole.EMPLOYEE]:
raise HTTPException(status_code=403, detail="Not authorized")
result = await db.execute(select(Media).where(Media.id == media_id))
media = result.scalar_one_or_none()
if not media:
raise HTTPException(status_code=404, detail="Media not found")
if alt_text is not None:
media.alt_text = alt_text
if title is not None:
media.title = title
if description is not None:
media.description = description
await db.commit()
await db.refresh(media)
return {
"id": media.id,
"filename": media.filename,
"file_url": media.file_url,
"alt_text": media.alt_text,
"title": media.title,
"description": media.description
}
@api_router.delete("/media/{media_id}")
async def delete_media(
media_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete a media file"""
if current_user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin access required")
result = await db.execute(select(Media).where(Media.id == media_id))
media = result.scalar_one_or_none()
if not media:
raise HTTPException(status_code=404, detail="Media not found")
# Delete physical file
try:
file_path = Path(media.file_path)
if file_path.exists():
file_path.unlink()
except Exception as e:
logger.warning(f"Could not delete file {media.file_path}: {e}")
# Delete from database
await db.delete(media)
await db.commit()
logger.info(f"Media deleted: {media.filename} by {current_user.email}")
return {"message": "Media deleted successfully"}
@api_router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""Comprehensive health check endpoint"""
@@ -2740,20 +3406,22 @@ async def health_check(db: AsyncSession = Depends(get_db)):
"timestamp": datetime.now(timezone.utc).isoformat()
}
# Add CORS middleware BEFORE including routes
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=["http://localhost:5300", "http://localhost:3000", "http://127.0.0.1:5300", "http://prompttech.dynns.com"],
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["*"],
expose_headers=["*"],
)
# Include the router
app.include_router(api_router)
# Mount static files for uploaded images
app.mount("/uploads", StaticFiles(directory=str(ROOT_DIR / "uploads")), name="uploads")
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','),
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8181)