Files

2760 lines
98 KiB
Python
Raw Permalink Normal View History

2026-01-27 18:07:00 -06:00
from fastapi import FastAPI, APIRouter, HTTPException, Depends, status, Query, Response, UploadFile, File, Form
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from fastapi.staticfiles import StaticFiles
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc, asc, distinct, delete
from sqlalchemy.orm import selectinload
import os
import logging
from pathlib import Path
from pydantic import BaseModel, Field, EmailStr, ConfigDict
from typing import List, Optional, Dict, Any
import uuid
from datetime import datetime, timezone, timedelta
import bcrypt
import jwt
import io
import csv
import base64
import shutil
from contextlib import asynccontextmanager
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from database import get_db, init_db, AsyncSessionLocal
from models import (
User, Product, Service, CartItem, Order, OrderItem, OrderStatusHistory,
Review, Booking, Contact, InventoryLog, Category, SalesReport,
OrderStatus, UserRole, Base, ProductImage, ServiceImage,
AboutContent, TeamMember, CompanyValue
)
ROOT_DIR = Path(__file__).parent
load_dotenv(ROOT_DIR / '.env')
# Create uploads directory for images
UPLOAD_DIR = ROOT_DIR / 'uploads' / 'products'
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# JWT Configuration
SECRET_KEY = os.environ.get('JWT_SECRET', 'techzone-super-secret-key-2024-production')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Lifespan event handler for FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize database with error handling and verification"""
try:
await init_db()
logger.info("Database initialized successfully")
# Verify database connection
try:
async with AsyncSessionLocal() as session:
result = await session.execute(select(func.count(User.id)))
user_count = result.scalar()
logger.info(f"Database connection verified - {user_count} users found")
except Exception as e:
logger.error(f"Database verification failed: {e}")
except Exception as e:
logger.critical(f"Database initialization failed: {e}", exc_info=True)
yield
# Cleanup on shutdown (if needed)
pass
# Create the main app with lifespan
app = FastAPI(title="TechZone API", version="2.0.0", lifespan=lifespan)
# Create a router with the /api prefix
api_router = APIRouter(prefix="/api")
security = HTTPBearer()
# ================== PYDANTIC MODELS ==================
class UserCreate(BaseModel):
email: EmailStr
name: str
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
class ProductCreate(BaseModel):
name: str
description: str # Now supports HTML from rich text editor
price: float
category: str
image_url: str = "" # Optional - for backwards compatibility
stock: int = 10
low_stock_threshold: int = 5
brand: str = ""
specs: dict = {}
images: List[str] = [] # List of image URLs
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None # Supports HTML
price: Optional[float] = None
category: Optional[str] = None
image_url: Optional[str] = None
stock: Optional[int] = None
low_stock_threshold: Optional[int] = None
brand: Optional[str] = None
specs: Optional[dict] = None
is_active: Optional[bool] = None
images: Optional[List[str]] = None # List of image URLs
class CategoryCreate(BaseModel):
name: str
slug: Optional[str] = None
description: Optional[str] = ""
class CategoryUpdate(BaseModel):
name: Optional[str] = None
slug: Optional[str] = None
description: Optional[str] = None
class ServiceCreate(BaseModel):
name: str
description: str
price: float
duration: str
image_url: Optional[str] = "" # Deprecated
category: str
images: List[str] = []
class ServiceUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
duration: Optional[str] = None
image_url: Optional[str] = None # Deprecated
category: Optional[str] = None
images: Optional[List[str]] = None
category: Optional[str] = None
images: Optional[List[str]] = None
is_active: Optional[bool] = None
class CartItemCreate(BaseModel):
product_id: str
quantity: int = 1
class OrderCreate(BaseModel):
shipping_address: dict = {}
notes: str = ""
class OrderStatusUpdate(BaseModel):
status: str
notes: str = ""
tracking_number: Optional[str] = None
class ReviewCreate(BaseModel):
product_id: Optional[str] = None
service_id: Optional[str] = None
rating: int
title: str = ""
comment: str = ""
class BookingCreate(BaseModel):
service_id: str
name: str
email: EmailStr
phone: str
preferred_date: str
notes: str = ""
class ContactCreate(BaseModel):
name: str
email: EmailStr
subject: str
message: str
class InventoryAdjust(BaseModel):
quantity_change: int
notes: str = ""
# ================== HELPERS ==================
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_admin_user(user: User = Depends(get_current_user)):
"""Verify user has admin role with logging"""
if not user:
logger.warning("Admin access attempted with no user")
raise HTTPException(status_code=401, detail="Authentication required")
if user.role != UserRole.ADMIN:
logger.warning(f"Non-admin user {user.id} attempted admin access")
raise HTTPException(status_code=403, detail="Admin access required")
logger.debug(f"Admin access granted to user {user.id}")
return user
async def get_optional_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)), db: AsyncSession = Depends(get_db)):
if credentials is None:
return None
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
except:
pass
return None
# ================== CRUD HELPERS ==================
async def _get_or_404(db: AsyncSession, model, record_id: str, error_message: str = "Record not found"):
"""Generic helper to fetch a record by ID or raise 404"""
result = await db.execute(select(model).where(model.id == record_id))
record = result.scalar_one_or_none()
if not record:
raise HTTPException(status_code=404, detail=error_message)
return record
async def _soft_delete(db: AsyncSession, record, commit: bool = True):
"""Generic helper for soft delete (set is_active=False)"""
record.is_active = False
if commit:
await db.commit()
return {"message": f"{record.__class__.__name__} deleted"}
def _build_response(message: str, **kwargs):
"""Build standardized API response"""
response = {"message": message}
response.update(kwargs)
return response
# ================== SERIALIZATION HELPERS ==================
def _safe_isoformat(dt) -> Optional[str]:
"""Safely convert datetime to ISO format"""
return dt.isoformat() if dt else None
def _safe_enum_value(enum_val, default="pending") -> str:
"""Safely extract enum value"""
return enum_val.value if enum_val else default
def _calculate_reviews_stats(reviews: list) -> dict:
"""Calculate review statistics"""
if not reviews:
return {"average_rating": 0, "review_count": 0}
return {
"average_rating": sum(r.rating for r in reviews) / len(reviews),
"review_count": len(reviews)
}
def user_to_dict(user: User) -> dict:
return {
"id": user.id,
"email": user.email,
"name": user.name,
"role": _safe_enum_value(user.role, "user"),
"is_active": getattr(user, 'is_active', True),
"created_at": _safe_isoformat(user.created_at)
}
def product_to_dict(product: Product, include_reviews: bool = False) -> dict:
# Get images if available
images = []
if hasattr(product, 'images') and product.images:
images = [
{
"id": img.id,
"url": img.image_url,
"display_order": img.display_order,
"is_primary": img.is_primary
}
for img in sorted(product.images, key=lambda x: x.display_order)
]
data = {
"id": product.id,
"name": product.name,
"description": product.description, # Contains HTML from rich text editor
"price": product.price,
"category": product.category,
"image_url": product.image_url or (images[0]["url"] if images else ""), # Fallback to first image
"images": images, # New: array of all images
"stock": product.stock,
"low_stock_threshold": product.low_stock_threshold,
"brand": product.brand,
"specs": product.specs or {},
"is_active": product.is_active,
"created_at": _safe_isoformat(product.created_at)
}
if include_reviews and product.reviews:
data["reviews"] = [review_to_dict(r) for r in product.reviews]
data.update(_calculate_reviews_stats(product.reviews))
return data
def service_to_dict(service: Service, include_reviews: bool = False) -> dict:
# Get images if available
images = []
if hasattr(service, 'images') and service.images:
images = [
{
"id": img.id,
"url": img.image_url,
"display_order": img.display_order,
"is_primary": img.is_primary
}
for img in sorted(service.images, key=lambda x: x.display_order)
]
# Set primary image_url for backwards compatibility
primary_image = images[0]["url"] if images else service.image_url
data = {
"id": service.id,
"name": service.name,
"description": service.description,
"price": service.price,
"duration": service.duration,
"image_url": primary_image,
"images": images,
"category": service.category,
"is_active": service.is_active,
"created_at": _safe_isoformat(service.created_at)
}
if include_reviews and service.reviews:
data["reviews"] = [review_to_dict(r) for r in service.reviews]
data.update(_calculate_reviews_stats(service.reviews))
return data
def order_to_dict(order: Order) -> dict:
return {
"id": order.id,
"user_id": order.user_id,
"status": _safe_enum_value(order.status),
"subtotal": order.subtotal,
"tax": order.tax,
"shipping": order.shipping,
"total": order.total,
"shipping_address": order.shipping_address or {},
"notes": order.notes,
"tracking_number": order.tracking_number,
"created_at": _safe_isoformat(order.created_at),
"updated_at": _safe_isoformat(order.updated_at),
"items": [order_item_to_dict(item) for item in order.items] if order.items else [],
"status_history": [status_history_to_dict(h) for h in order.status_history] if order.status_history else []
}
def order_item_to_dict(item: OrderItem) -> dict:
return {
"id": item.id,
"product_id": item.product_id,
"product_name": item.product_name,
"product_image": item.product_image,
"quantity": item.quantity,
"price": item.price
}
def status_history_to_dict(history: OrderStatusHistory) -> dict:
return {
"id": history.id,
"status": _safe_enum_value(history.status, None),
"notes": history.notes,
"created_at": _safe_isoformat(history.created_at)
}
def review_to_dict(review: Review) -> dict:
return {
"id": review.id,
"user_id": review.user_id,
"user_name": review.user.name if review.user else "Anonymous",
"product_id": review.product_id,
"service_id": review.service_id,
"rating": review.rating,
"title": review.title,
"comment": review.comment,
"is_verified_purchase": review.is_verified_purchase,
"created_at": _safe_isoformat(review.created_at)
}
def booking_to_dict(booking: Booking) -> dict:
return {
"id": booking.id,
"service_id": booking.service_id,
"service_name": booking.service_name,
"name": booking.name,
"email": booking.email,
"phone": booking.phone,
"preferred_date": booking.preferred_date,
"notes": booking.notes,
"status": booking.status,
"created_at": booking.created_at.isoformat() if booking.created_at else None
}
# ================== AUTH ROUTES ==================
@api_router.post("/auth/register", response_model=TokenResponse)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == user_data.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
user = User(
email=user_data.email,
name=user_data.name,
password=hash_password(user_data.password),
role=UserRole.USER
)
db.add(user)
await db.commit()
await db.refresh(user)
token = create_access_token({"sub": user.id})
return TokenResponse(access_token=token, user=user_to_dict(user))
@api_router.post("/auth/login", response_model=TokenResponse)
async def login(credentials: UserLogin, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == credentials.email))
user = result.scalar_one_or_none()
if not user or not verify_password(credentials.password, user.password):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token({"sub": user.id})
return TokenResponse(access_token=token, user=user_to_dict(user))
@api_router.get("/auth/me")
async def get_me(user: User = Depends(get_current_user)):
return user_to_dict(user)
# ================== PRODUCTS ROUTES ==================
@api_router.get("/products")
async def get_products(
response: Response,
category: Optional[str] = None,
search: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
db: AsyncSession = Depends(get_db)
):
# Add cache headers for better performance
response.headers["Cache-Control"] = "public, max-age=60" # Cache for 60 seconds
query = select(Product).where(Product.is_active == True).options(selectinload(Product.images))
if category and category != "all":
query = query.where(Product.category == category)
if search:
query = query.where(
or_(
Product.name.ilike(f"%{search}%"),
Product.description.ilike(f"%{search}%"),
Product.brand.ilike(f"%{search}%")
)
)
if min_price is not None:
query = query.where(Product.price >= min_price)
if max_price is not None:
query = query.where(Product.price <= max_price)
if in_stock:
query = query.where(Product.stock > 0)
query = query.options(selectinload(Product.reviews).selectinload(Review.user))
result = await db.execute(query)
products = result.scalars().all()
return [product_to_dict(p, include_reviews=True) for p in products]
@api_router.get("/products/{product_id}")
async def get_product(product_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Product)
.where(Product.id == product_id)
.options(
selectinload(Product.images),
selectinload(Product.reviews).selectinload(Review.user)
)
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product_to_dict(product, include_reviews=True)
@api_router.get("/products/categories/list")
async def get_product_categories(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product.category).distinct())
categories = [row[0] for row in result.fetchall()]
return categories
# ================== SERVICES ROUTES ==================
@api_router.get("/services")
async def get_services(
response: Response,
category: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
# Add cache headers for better performance
response.headers["Cache-Control"] = "public, max-age=60" # Cache for 60 seconds
query = select(Service).where(Service.is_active == True).options(selectinload(Service.images))
if category and category != "all":
query = query.where(Service.category == category)
query = query.options(selectinload(Service.reviews).selectinload(Review.user))
result = await db.execute(query)
services = result.scalars().all()
return [service_to_dict(s, include_reviews=True) for s in services]
@api_router.get("/services/{service_id}")
async def get_service(service_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Service)
.where(Service.id == service_id)
.options(
selectinload(Service.images),
selectinload(Service.reviews).selectinload(Review.user)
)
)
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
return service_to_dict(service, include_reviews=True)
@api_router.post("/services/book")
async def book_service(
booking_data: BookingCreate,
user: Optional[User] = Depends(get_optional_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(Service).where(Service.id == booking_data.service_id))
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
booking = Booking(
service_id=booking_data.service_id,
user_id=user.id if user else None,
name=booking_data.name,
email=booking_data.email,
phone=booking_data.phone,
preferred_date=booking_data.preferred_date,
notes=booking_data.notes,
service_name=service.name
)
db.add(booking)
await db.commit()
return {"message": "Booking created successfully", "booking_id": booking.id}
# ================== CART ROUTES ==================
@api_router.get("/cart")
async def get_cart(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem)
.where(CartItem.user_id == user.id)
.options(
selectinload(CartItem.product).selectinload(Product.images)
)
)
cart_items = result.scalars().all()
return [{
"id": item.id,
"product_id": item.product_id,
"quantity": item.quantity,
"product": product_to_dict(item.product) if item.product else None
} for item in cart_items]
@api_router.post("/cart/add")
async def add_to_cart(item: CartItemCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product).where(Product.id == item.product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
result = await db.execute(
select(CartItem).where(
and_(CartItem.user_id == user.id, CartItem.product_id == item.product_id)
)
)
existing = result.scalar_one_or_none()
if existing:
existing.quantity += item.quantity
else:
cart_item = CartItem(user_id=user.id, product_id=item.product_id, quantity=item.quantity)
db.add(cart_item)
await db.commit()
return {"message": "Item added to cart"}
@api_router.put("/cart/{item_id}")
async def update_cart_item(item_id: str, quantity: int = Query(...), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem).where(and_(CartItem.id == item_id, CartItem.user_id == user.id))
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Cart item not found")
if quantity <= 0:
await db.delete(item)
else:
item.quantity = quantity
await db.commit()
return {"message": "Cart updated"}
@api_router.delete("/cart/{item_id}")
async def remove_from_cart(item_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem).where(and_(CartItem.id == item_id, CartItem.user_id == user.id))
)
item = result.scalar_one_or_none()
if item:
await db.delete(item)
await db.commit()
return {"message": "Item removed from cart"}
@api_router.delete("/cart")
async def clear_cart(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
await db.execute(
CartItem.__table__.delete().where(CartItem.user_id == user.id)
)
await db.commit()
return {"message": "Cart cleared"}
# ================== ORDERS ROUTES ==================
@api_router.post("/orders")
async def create_order(order_data: OrderCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
# Get cart items
result = await db.execute(
select(CartItem)
.where(CartItem.user_id == user.id)
.options(selectinload(CartItem.product))
)
cart_items = result.scalars().all()
if not cart_items:
raise HTTPException(status_code=400, detail="Cart is empty")
# Calculate totals
subtotal = sum(item.product.price * item.quantity for item in cart_items)
tax = subtotal * 0.08
shipping = 0 if subtotal > 100 else 9.99
total = subtotal + tax + shipping
# Create order
order = Order(
user_id=user.id,
status=OrderStatus.PENDING,
subtotal=subtotal,
tax=tax,
shipping=shipping,
total=total,
shipping_address=order_data.shipping_address,
notes=order_data.notes
)
db.add(order)
await db.flush()
# Create order items and update inventory
for cart_item in cart_items:
product = cart_item.product
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=cart_item.quantity,
price=product.price,
product_name=product.name,
product_image=product.image_url
)
db.add(order_item)
# Update stock
previous_stock = product.stock
product.stock = max(0, product.stock - cart_item.quantity)
# Log inventory change
inv_log = InventoryLog(
product_id=product.id,
action="sale",
quantity_change=-cart_item.quantity,
previous_stock=previous_stock,
new_stock=product.stock,
notes=f"Order {order.id}",
created_by=user.id
)
db.add(inv_log)
# Add status history
status_history = OrderStatusHistory(
order_id=order.id,
status=OrderStatus.PENDING,
notes="Order placed",
created_by=user.id
)
db.add(status_history)
# Clear cart
await db.execute(CartItem.__table__.delete().where(CartItem.user_id == user.id))
await db.commit()
await db.refresh(order)
return {"message": "Order created successfully", "order_id": order.id}
@api_router.get("/orders")
async def get_orders(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order)
.where(Order.user_id == user.id)
.options(selectinload(Order.items), selectinload(Order.status_history))
.order_by(desc(Order.created_at))
)
orders = result.scalars().all()
return [order_to_dict(o) for o in orders]
@api_router.get("/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order)
.where(and_(Order.id == order_id, Order.user_id == user.id))
.options(selectinload(Order.items), selectinload(Order.status_history))
)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order_to_dict(order)
# ================== REVIEWS ROUTES ==================
@api_router.post("/reviews")
async def create_review(review_data: ReviewCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
if not review_data.product_id and not review_data.service_id:
raise HTTPException(status_code=400, detail="Product or service ID required")
if review_data.rating < 1 or review_data.rating > 5:
raise HTTPException(status_code=400, detail="Rating must be between 1 and 5")
# Check for verified purchase
is_verified = False
if review_data.product_id:
result = await db.execute(
select(OrderItem)
.join(Order)
.where(
and_(
Order.user_id == user.id,
OrderItem.product_id == review_data.product_id,
Order.status.in_([OrderStatus.DELIVERED, OrderStatus.SHIPPED])
)
)
)
if result.scalar_one_or_none():
is_verified = True
review = Review(
user_id=user.id,
product_id=review_data.product_id,
service_id=review_data.service_id,
rating=review_data.rating,
title=review_data.title,
comment=review_data.comment,
is_verified_purchase=is_verified
)
db.add(review)
await db.commit()
await db.refresh(review)
return {"message": "Review submitted successfully", "review_id": review.id}
@api_router.get("/reviews/product/{product_id}")
async def get_product_reviews(product_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Review)
.where(and_(Review.product_id == product_id, Review.is_approved == True))
.options(selectinload(Review.user))
.order_by(desc(Review.created_at))
)
reviews = result.scalars().all()
return [review_to_dict(r) for r in reviews]
@api_router.get("/reviews/service/{service_id}")
async def get_service_reviews(service_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Review)
.where(and_(Review.service_id == service_id, Review.is_approved == True))
.options(selectinload(Review.user))
.order_by(desc(Review.created_at))
)
reviews = result.scalars().all()
return [review_to_dict(r) for r in reviews]
# ================== CONTACT ROUTES ==================
@api_router.post("/contact")
async def submit_contact(contact_data: ContactCreate, db: AsyncSession = Depends(get_db)):
contact = Contact(
name=contact_data.name,
email=contact_data.email,
subject=contact_data.subject,
message=contact_data.message
)
db.add(contact)
await db.commit()
return {"message": "Message sent successfully", "id": contact.id}
# ================== ADMIN ROUTES ==================
# Admin - Dashboard Stats
@api_router.get("/admin/dashboard")
async def get_admin_dashboard(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
"""Admin dashboard with comprehensive stats and error handling"""
async def safe_scalar(query, default=0, error_msg=""):
"""Execute query and return scalar with error handling"""
try:
result = await db.execute(query)
value = result.scalar()
return float(value) if value is not None and isinstance(value, (int, float)) else default
except Exception as e:
if error_msg:
logger.error(f"{error_msg}: {e}")
return default
try:
today = datetime.now(timezone.utc).date()
month_ago = today - timedelta(days=30)
month_start = datetime.combine(month_ago, datetime.min.time())
# Batch count queries for better performance
counts_queries = {
"products": select(func.count(Product.id)),
"services": select(func.count(Service.id)),
"users": select(func.count(User.id)),
"orders": select(func.count(Order.id))
}
counts = {}
for key, query in counts_queries.items():
counts[key] = await safe_scalar(query, 0, f"Error fetching {key} count")
# Revenue queries
total_revenue = await safe_scalar(
select(func.sum(Order.total)),
0.0, "Error fetching total revenue"
)
monthly_revenue = await safe_scalar(
select(func.sum(Order.total)).where(Order.created_at >= month_start),
0.0, "Error fetching monthly revenue"
)
# Today's stats - batch queries
today_orders = await safe_scalar(
select(func.count(Order.id)).where(func.date(Order.created_at) == today),
0, "Error fetching today's orders"
)
today_revenue = await safe_scalar(
select(func.sum(Order.total)).where(func.date(Order.created_at) == today),
0.0, "Error fetching today's revenue"
)
pending_bookings = await safe_scalar(
select(func.count(Booking.id)).where(Booking.status == "pending"),
0, "Error fetching pending bookings"
)
# Low stock products
low_stock_products = []
try:
low_stock_result = await db.execute(
select(Product)
.where(Product.stock <= Product.low_stock_threshold, Product.is_active == True)
)
low_stock_products = [{
"id": p.id, "name": p.name, "stock": p.stock,
"low_stock_threshold": p.low_stock_threshold, "category": p.category
} for p in low_stock_result.scalars().all()]
except Exception as e:
logger.error(f"Error fetching low stock products: {e}")
# Recent orders
recent_orders_data = []
try:
recent_orders_result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.order_by(desc(Order.created_at))
.limit(10)
)
for order in recent_orders_result.scalars().all():
try:
recent_orders_data.append({
"id": order.id,
"status": _safe_enum_value(order.status),
"total": float(order.total) if order.total is not None else 0.0,
"created_at": _safe_isoformat(order.created_at),
"items": [{"id": i.id, "product_name": i.product_name, "quantity": i.quantity} for i in order.items] if order.items else []
})
except Exception as e:
logger.warning(f"Error processing order {order.id}: {e}")
except Exception as e:
logger.error(f"Error fetching recent orders: {e}")
# Build response
response = {
"stats": {
"total_products": int(counts["products"]),
"total_services": int(counts["services"]),
"total_users": int(counts["users"]),
"total_orders": int(counts["orders"]),
"total_revenue": total_revenue,
"monthly_revenue": monthly_revenue,
"today_orders": int(today_orders),
"today_revenue": today_revenue,
"pending_bookings": int(pending_bookings)
},
"low_stock_products": low_stock_products,
"recent_orders": recent_orders_data
}
logger.info(f"Dashboard data fetched successfully for user {user.id}")
return response
except Exception as e:
logger.error(f"Critical error in get_admin_dashboard: {e}", exc_info=True)
# Return safe defaults instead of crashing
return {
"stats": {
"total_products": 0,
"total_services": 0,
"total_users": 0,
"total_orders": 0,
"total_revenue": 0.0,
"monthly_revenue": 0.0,
"today_orders": 0,
"today_revenue": 0.0,
"pending_bookings": 0
},
"low_stock_products": [],
"recent_orders": [],
"error": "Failed to load some dashboard data"
}
# ================== IMAGE UPLOAD ROUTES ==================
@api_router.post("/upload/image")
async def upload_image(
file: UploadFile = File(...),
user: User = Depends(get_admin_user)
):
"""Upload a product image and return the URL"""
try:
logger.info(f"=== Image Upload Request ===")
logger.info(f"Filename: '{file.filename}'")
logger.info(f"Content-Type: {file.content_type}")
logger.info(f"File size: {file.size if hasattr(file, 'size') else 'unknown'}")
# Handle cases where filename might be None or empty
if not file.filename:
logger.error("No filename provided")
raise HTTPException(status_code=400, detail="No filename provided")
# Validate file extension (more reliable than content_type)
file_ext = Path(file.filename).suffix.lower()
logger.info(f"Extracted extension: '{file_ext}'")
# Handle cases where file has no extension - try to infer from content_type
if not file_ext and file.content_type:
content_type_map = {
'image/jpeg': '.jpg',
'image/jpg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'image/bmp': '.bmp',
'image/svg+xml': '.svg',
'image/heic': '.heic',
'image/heif': '.heif'
}
file_ext = content_type_map.get(file.content_type, '')
logger.info(f"Inferred extension from content-type: '{file_ext}'")
# Support common image formats including iPhone formats
allowed_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg', '.heic', '.heif'}
# Special handling for HEIC/HEIF - convert to JPG for better compatibility
convert_to_jpg = file_ext in {'.heic', '.heif'}
if convert_to_jpg:
logger.info(f"HEIC/HEIF file detected, will convert to JPG for compatibility")
if not file_ext or file_ext not in allowed_extensions:
logger.warning(f"Invalid or missing file extension: '{file_ext}'")
logger.warning(f"Full filename: '{file.filename}'")
raise HTTPException(
status_code=400,
detail=f"File must be an image with a valid extension. Allowed: {', '.join(sorted(allowed_extensions))}"
)
# Generate unique filename - convert HEIC to JPG extension
output_ext = '.jpg' if convert_to_jpg else file_ext
unique_filename = f"{uuid.uuid4()}{output_ext}"
file_path = UPLOAD_DIR / unique_filename
logger.info(f"Saving file to: {file_path}")
# Save file asynchronously
content = await file.read()
if len(content) == 0:
raise HTTPException(status_code=400, detail="File is empty")
with open(file_path, "wb") as buffer:
buffer.write(content)
# Return relative URL
# Convert HEIC/HEIF to JPG for better browser compatibility
if convert_to_jpg:
try:
from PIL import Image
from io import BytesIO
# Try to use pillow_heif if available
try:
import pillow_heif
pillow_heif.register_heif_opener()
except ImportError:
logger.warning("pillow_heif not installed, HEIC conversion may not work")
# Open and convert image
img = Image.open(BytesIO(content))
img = img.convert('RGB') # Ensure RGB mode for JPEG
img.save(file_path, 'JPEG', quality=90)
logger.info(f"Converted HEIC/HEIF to JPG successfully")
except Exception as e:
logger.error(f"Failed to convert HEIC/HEIF: {e}")
# Fallback: save as-is and let browser handle it
with open(file_path, "wb") as buffer:
buffer.write(content)
logger.info(f"Saved HEIC file as-is without conversion")
else:
with open(file_path, "wb") as buffer:
buffer.write(content)
# Return relative URL
image_url = f"/uploads/products/{unique_filename}"
logger.info(f"Image uploaded successfully: {image_url}, size: {len(content)} bytes")
return {"url": image_url, "filename": unique_filename}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to upload image: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to upload image: {str(e)}")
@api_router.post("/admin/products/{product_id}/images")
async def add_product_images(
product_id: str,
image_urls: List[str],
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Add multiple images to a product"""
product = await _get_or_404(db, Product, product_id, "Product not found")
# Get current max display order
result = await db.execute(
select(func.max(ProductImage.display_order))
.where(ProductImage.product_id == product_id)
)
max_order = result.scalar() or -1
# Add new images
for idx, url in enumerate(image_urls):
product_image = ProductImage(
product_id=product_id,
image_url=url,
display_order=max_order + idx + 1,
is_primary=(max_order == -1 and idx == 0) # First image is primary if no images exist
)
db.add(product_image)
await db.commit()
return {"message": f"Added {len(image_urls)} images", "product_id": product_id}
@api_router.delete("/admin/products/{product_id}/images/{image_id}")
async def delete_product_image(
product_id: str,
image_id: str,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete a product image"""
result = await db.execute(
select(ProductImage)
.where(and_(ProductImage.id == image_id, ProductImage.product_id == product_id))
)
image = result.scalar_one_or_none()
if not image:
raise HTTPException(status_code=404, detail="Image not found")
# Delete file from filesystem
try:
file_path = UPLOAD_DIR / Path(image.image_url).name
if file_path.exists():
file_path.unlink()
except Exception as e:
logger.warning(f"Failed to delete image file: {e}")
await db.delete(image)
await db.commit()
return {"message": "Image deleted"}
@api_router.put("/admin/products/{product_id}/images/reorder")
async def reorder_product_images(
product_id: str,
image_orders: Dict[str, int], # {image_id: display_order}
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Reorder product images"""
for image_id, order in image_orders.items():
result = await db.execute(
select(ProductImage)
.where(and_(ProductImage.id == image_id, ProductImage.product_id == product_id))
)
image = result.scalar_one_or_none()
if image:
image.display_order = order
await db.commit()
return {"message": "Images reordered"}
# Admin - Products CRUD
@api_router.get("/admin/products")
async def admin_get_products(
include_inactive: bool = False,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
query = select(Product).options(selectinload(Product.images))
if not include_inactive:
query = query.where(Product.is_active == True)
query = query.order_by(desc(Product.created_at))
result = await db.execute(query)
products = result.scalars().all()
return [product_to_dict(p) for p in products]
@api_router.post("/admin/products")
async def admin_create_product(product_data: ProductCreate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
data_dict = product_data.model_dump()
image_urls = data_dict.pop('images', [])
product = Product(**data_dict)
db.add(product)
await db.commit()
await db.refresh(product)
# Add images if provided
if image_urls:
for idx, url in enumerate(image_urls):
product_image = ProductImage(
product_id=product.id,
image_url=url,
display_order=idx,
is_primary=(idx == 0)
)
db.add(product_image)
await db.commit()
# Log inventory
inv_log = InventoryLog(
product_id=product.id,
action="add",
quantity_change=product.stock,
previous_stock=0,
new_stock=product.stock,
notes="Initial stock",
created_by=user.id
)
db.add(inv_log)
await db.commit()
# Reload product with images relationship
result = await db.execute(
select(Product)
.where(Product.id == product.id)
.options(selectinload(Product.images))
)
product = result.scalar_one()
return product_to_dict(product)
@api_router.put("/admin/products/{product_id}")
async def admin_update_product(product_id: str, product_data: ProductUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
product = await _get_or_404(db, Product, product_id, "Product not found")
update_data = product_data.model_dump(exclude_unset=True)
image_urls = update_data.pop('images', None)
# Track stock changes
if "stock" in update_data and update_data["stock"] != product.stock:
inv_log = InventoryLog(
product_id=product.id,
action="adjust",
quantity_change=update_data["stock"] - product.stock,
previous_stock=product.stock,
new_stock=update_data["stock"],
notes="Manual adjustment",
created_by=user.id
)
db.add(inv_log)
for key, value in update_data.items():
setattr(product, key, value)
# Update images if provided
if image_urls is not None:
# Delete existing images
await db.execute(
delete(ProductImage).where(ProductImage.product_id == product_id)
)
# Add new images
for idx, url in enumerate(image_urls):
product_image = ProductImage(
product_id=product_id,
image_url=url,
display_order=idx,
is_primary=(idx == 0)
)
db.add(product_image)
await db.commit()
# Reload product with images relationship
result = await db.execute(
select(Product)
.where(Product.id == product_id)
.options(selectinload(Product.images))
)
product = result.scalar_one()
return product_to_dict(product)
@api_router.delete("/admin/products/{product_id}")
async def admin_delete_product(product_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
product = await _get_or_404(db, Product, product_id, "Product not found")
# Hard delete - remove from database
await db.delete(product)
await db.commit()
return {"message": "Product deleted"}
# Admin - Categories CRUD
@api_router.get("/admin/categories")
async def admin_get_categories(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Category).order_by(Category.name))
categories = result.scalars().all()
return [{
"id": str(c.id),
"name": c.name,
"description": c.description,
"created_at": c.created_at.isoformat() if c.created_at else None
} for c in categories]
@api_router.post("/admin/categories")
async def admin_create_category(category_data: CategoryCreate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
data = category_data.model_dump()
# Auto-generate slug if not provided
if not data.get('slug'):
data['slug'] = data['name'].lower().replace(' ', '-').replace('&', 'and')
category = Category(**data)
db.add(category)
await db.commit()
await db.refresh(category)
return {
"id": str(category.id),
"name": category.name,
"slug": category.slug,
"description": category.description,
"created_at": category.created_at.isoformat() if category.created_at else None
}
@api_router.put("/admin/categories/{category_id}")
async def admin_update_category(category_id: str, category_data: CategoryUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
category = await _get_or_404(db, Category, category_id, "Category not found")
update_data = category_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(category, key, value)
await db.commit()
await db.refresh(category)
return {
"id": str(category.id),
"name": category.name,
"description": category.description,
"created_at": category.created_at.isoformat() if category.created_at else None
}
@api_router.delete("/admin/categories/{category_id}")
async def admin_delete_category(category_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
category = await _get_or_404(db, Category, category_id, "Category not found")
await db.delete(category)
await db.commit()
return {"message": "Category deleted successfully"}
@api_router.get("/categories")
async def get_categories(db: AsyncSession = Depends(get_db)):
"""Public endpoint to get all categories"""
result = await db.execute(select(Category).order_by(Category.name))
categories = result.scalars().all()
return [{
"id": str(c.id),
"name": c.name,
"description": c.description
} for c in categories]
# Admin - Services CRUD
@api_router.get("/admin/services")
async def admin_get_services(include_inactive: bool = False, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
query = select(Service).options(selectinload(Service.images))
if not include_inactive:
query = query.where(Service.is_active == True)
query = query.order_by(desc(Service.created_at))
result = await db.execute(query)
services = result.scalars().all()
return [service_to_dict(s) for s in services]
@api_router.post("/admin/services")
async def admin_create_service(service_data: ServiceCreate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
data_dict = service_data.model_dump()
image_urls = data_dict.pop('images', [])
service = Service(**data_dict)
db.add(service)
await db.commit()
await db.refresh(service)
# Add images if provided
if image_urls:
for idx, url in enumerate(image_urls):
service_image = ServiceImage(
service_id=service.id,
image_url=url,
display_order=idx,
is_primary=(idx == 0)
)
db.add(service_image)
await db.commit()
# Reload service with images relationship
result = await db.execute(
select(Service)
.where(Service.id == service.id)
.options(selectinload(Service.images))
)
service = result.scalar_one()
return service_to_dict(service)
@api_router.put("/admin/services/{service_id}")
async def admin_update_service(service_id: str, service_data: ServiceUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
service = await _get_or_404(db, Service, service_id, "Service not found")
update_data = service_data.model_dump(exclude_unset=True)
image_urls = update_data.pop('images', None)
for key, value in update_data.items():
setattr(service, key, value)
# Update images if provided
if image_urls is not None:
# Delete existing images
await db.execute(
delete(ServiceImage).where(ServiceImage.service_id == service_id)
)
# Add new images
for idx, url in enumerate(image_urls):
service_image = ServiceImage(
service_id=service_id,
image_url=url,
display_order=idx,
is_primary=(idx == 0)
)
db.add(service_image)
await db.commit()
# Reload service with images relationship
result = await db.execute(
select(Service)
.where(Service.id == service_id)
.options(selectinload(Service.images))
)
service = result.scalar_one()
return service_to_dict(service)
@api_router.delete("/admin/services/{service_id}")
async def admin_delete_service(service_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
service = await _get_or_404(db, Service, service_id, "Service not found")
# Hard delete - remove from database
await db.delete(service)
await db.commit()
return {"message": "Service deleted"}
# Admin - Orders Management
@api_router.get("/admin/orders")
async def admin_get_orders(
status: Optional[str] = None,
limit: int = 50,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
query = select(Order).options(selectinload(Order.items), selectinload(Order.status_history), selectinload(Order.user))
if status:
query = query.where(Order.status == OrderStatus(status))
query = query.order_by(desc(Order.created_at)).limit(limit)
result = await db.execute(query)
orders = result.scalars().all()
return [{
**order_to_dict(o),
"user_name": o.user.name if o.user else "Unknown",
"user_email": o.user.email if o.user else "Unknown"
} for o in orders]
@api_router.put("/admin/orders/{order_id}/status")
async def admin_update_order_status(order_id: str, status_data: OrderStatusUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order).where(Order.id == order_id).options(selectinload(Order.items))
)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
new_status = OrderStatus(status_data.status)
order.status = new_status
if status_data.tracking_number:
order.tracking_number = status_data.tracking_number
# Handle refunds - restore stock
if new_status == OrderStatus.REFUNDED:
for item in order.items:
result = await db.execute(select(Product).where(Product.id == item.product_id))
product = result.scalar_one_or_none()
if product:
previous_stock = product.stock
product.stock += item.quantity
inv_log = InventoryLog(
product_id=product.id,
action="refund",
quantity_change=item.quantity,
previous_stock=previous_stock,
new_stock=product.stock,
notes=f"Refund for order {order_id}",
created_by=user.id
)
db.add(inv_log)
# Add status history
status_history = OrderStatusHistory(
order_id=order.id,
status=new_status,
notes=status_data.notes,
created_by=user.id
)
db.add(status_history)
await db.commit()
return {"message": "Order status updated"}
# Admin - Inventory Management
@api_router.get("/admin/inventory")
async def admin_get_inventory(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
try:
result = await db.execute(
select(Product)
.options(selectinload(Product.images))
.where(Product.is_active == True)
.order_by(Product.stock)
)
products = result.scalars().all()
inventory_data = []
for p in products:
product_dict = product_to_dict(p)
product_dict["is_low_stock"] = p.stock <= p.low_stock_threshold
inventory_data.append(product_dict)
return inventory_data
except Exception as e:
logger.error(f"Error fetching inventory: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to fetch inventory: {str(e)}")
@api_router.post("/admin/inventory/{product_id}/adjust")
async def admin_adjust_inventory(product_id: str, adjustment: InventoryAdjust, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
product = await _get_or_404(db, Product, product_id, "Product not found")
previous_stock = product.stock
product.stock = max(0, product.stock + adjustment.quantity_change)
inv_log = InventoryLog(
product_id=product.id,
action="adjust" if adjustment.quantity_change >= 0 else "remove",
quantity_change=adjustment.quantity_change,
previous_stock=previous_stock,
new_stock=product.stock,
notes=adjustment.notes,
created_by=user.id
)
db.add(inv_log)
await db.commit()
return _build_response("Inventory adjusted", new_stock=product.stock)
@api_router.get("/admin/inventory/{product_id}/logs")
async def admin_get_inventory_logs(product_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(InventoryLog)
.where(InventoryLog.product_id == product_id)
.order_by(desc(InventoryLog.created_at))
.limit(50)
)
logs = result.scalars().all()
return [{
"id": log.id,
"action": log.action,
"quantity_change": log.quantity_change,
"previous_stock": log.previous_stock,
"new_stock": log.new_stock,
"notes": log.notes,
"created_at": log.created_at.isoformat() if log.created_at else None
} for log in logs]
# Admin - Bookings Management
@api_router.get("/admin/bookings")
async def admin_get_bookings(status: Optional[str] = None, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
query = select(Booking).options(selectinload(Booking.service))
if status:
query = query.where(Booking.status == status)
query = query.order_by(desc(Booking.created_at))
result = await db.execute(query)
bookings = result.scalars().all()
return [booking_to_dict(b) for b in bookings]
@api_router.put("/admin/bookings/{booking_id}/status")
async def admin_update_booking_status(booking_id: str, status: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Booking).where(Booking.id == booking_id))
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
booking.status = status
await db.commit()
return {"message": "Booking status updated"}
# Admin - Users Management
# Admin - Reports
@api_router.get("/admin/reports/sales")
async def admin_get_sales_report(
period: str = "daily", # daily, weekly, monthly
start_date: Optional[str] = None,
end_date: Optional[str] = None,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
now = datetime.now(timezone.utc)
if start_date:
start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
else:
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
if end_date:
end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
else:
end = now
# Get orders in date range
result = await db.execute(
select(Order)
.where(and_(Order.created_at >= start, Order.created_at <= end))
.options(selectinload(Order.items))
.order_by(Order.created_at)
)
orders = result.scalars().all()
# Get bookings in date range
bookings_result = await db.execute(
select(Booking)
.where(and_(Booking.created_at >= start, Booking.created_at <= end))
)
bookings = bookings_result.scalars().all()
# Aggregate by period
report_data = {}
for order in orders:
if period == "daily":
key = order.created_at.strftime("%Y-%m-%d")
elif period == "weekly":
key = order.created_at.strftime("%Y-W%W")
else:
key = order.created_at.strftime("%Y-%m")
if key not in report_data:
report_data[key] = {
"period": key,
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
}
report_data[key]["orders"] += 1
report_data[key]["revenue"] += order.total
report_data[key]["products_sold"] += sum(item.quantity for item in order.items)
status = order.status.value if order.status else "unknown"
report_data[key]["order_statuses"][status] = report_data[key]["order_statuses"].get(status, 0) + 1
# Add booking counts
for booking in bookings:
if period == "daily":
key = booking.created_at.strftime("%Y-%m-%d")
elif period == "weekly":
key = booking.created_at.strftime("%Y-W%W")
else:
key = booking.created_at.strftime("%Y-%m")
if key not in report_data:
report_data[key] = {
"period": key,
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
}
report_data[key]["services_booked"] = report_data[key].get("services_booked", 0) + 1
# Calculate totals
total_orders = len(orders)
total_revenue = sum(o.total for o in orders)
total_products = sum(sum(item.quantity for item in o.items) for o in orders)
total_bookings = len(bookings)
return {
"period": period,
"start_date": start.isoformat(),
"end_date": end.isoformat(),
"summary": {
"total_orders": total_orders,
"total_revenue": total_revenue,
"total_products_sold": total_products,
"total_services_booked": total_bookings,
"average_order_value": total_revenue / total_orders if total_orders > 0 else 0
},
"data": list(report_data.values())
}
# Admin - Export Reports
@api_router.get("/admin/reports/export/csv")
async def admin_export_csv(
report_type: str = "sales", # sales, inventory, orders
period: str = "monthly",
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
output = io.StringIO()
writer = csv.writer(output)
now = datetime.now(timezone.utc)
if report_type == "sales":
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
result = await db.execute(
select(Order)
.where(Order.created_at >= start)
.options(selectinload(Order.items), selectinload(Order.user))
.order_by(Order.created_at)
)
orders = result.scalars().all()
writer.writerow(["Date", "Order ID", "Customer", "Items", "Subtotal", "Tax", "Shipping", "Total", "Status"])
for order in orders:
writer.writerow([
order.created_at.strftime("%Y-%m-%d %H:%M"),
order.id,
order.user.name if order.user else "Guest",
sum(item.quantity for item in order.items),
f"${order.subtotal:.2f}",
f"${order.tax:.2f}",
f"${order.shipping:.2f}",
f"${order.total:.2f}",
order.status.value if order.status else "unknown"
])
elif report_type == "inventory":
result = await db.execute(select(Product).where(Product.is_active == True))
products = result.scalars().all()
writer.writerow(["Product ID", "Name", "Category", "Brand", "Price", "Stock", "Low Stock Threshold", "Status"])
for product in products:
writer.writerow([
product.id,
product.name,
product.category,
product.brand,
f"${product.price:.2f}",
product.stock,
product.low_stock_threshold,
"Low Stock" if product.stock <= product.low_stock_threshold else "In Stock"
])
elif report_type == "orders":
result = await db.execute(
select(Order)
.options(selectinload(Order.items), selectinload(Order.user))
.order_by(desc(Order.created_at))
.limit(500)
)
orders = result.scalars().all()
writer.writerow(["Order ID", "Date", "Customer", "Email", "Items", "Total", "Status", "Tracking"])
for order in orders:
writer.writerow([
order.id,
order.created_at.strftime("%Y-%m-%d %H:%M"),
order.user.name if order.user else "Guest",
order.user.email if order.user else "",
sum(item.quantity for item in order.items),
f"${order.total:.2f}",
order.status.value if order.status else "unknown",
order.tracking_number or ""
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={report_type}_report_{now.strftime('%Y%m%d')}.csv"}
)
@api_router.get("/admin/reports/export/pdf")
async def admin_export_pdf(
report_type: str = "sales",
period: str = "monthly",
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
styles = getSampleStyleSheet()
elements = []
now = datetime.now(timezone.utc)
# Title
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
spaceAfter=30
)
elements.append(Paragraph(f"TechZone {report_type.title()} Report", title_style))
elements.append(Paragraph(f"Generated: {now.strftime('%Y-%m-%d %H:%M')}", styles['Normal']))
elements.append(Spacer(1, 20))
if report_type == "sales":
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
result = await db.execute(
select(Order)
.where(Order.created_at >= start)
.options(selectinload(Order.items))
)
orders = result.scalars().all()
# Summary
total_orders = len(orders)
total_revenue = sum(o.total for o in orders)
total_products = sum(sum(item.quantity for item in o.items) for o in orders)
elements.append(Paragraph("Summary", styles['Heading2']))
summary_data = [
["Metric", "Value"],
["Total Orders", str(total_orders)],
["Total Revenue", f"${total_revenue:.2f}"],
["Products Sold", str(total_products)],
["Average Order Value", f"${total_revenue/total_orders:.2f}" if total_orders > 0 else "$0.00"]
]
summary_table = Table(summary_data, colWidths=[3*inch, 2*inch])
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
elements.append(summary_table)
elements.append(Spacer(1, 20))
# Orders table
elements.append(Paragraph("Recent Orders", styles['Heading2']))
orders_data = [["Date", "Order ID", "Items", "Total", "Status"]]
for order in orders[:50]:
orders_data.append([
order.created_at.strftime("%Y-%m-%d"),
order.id[:8] + "...",
str(sum(item.quantity for item in order.items)),
f"${order.total:.2f}",
order.status.value if order.status else "unknown"
])
orders_table = Table(orders_data, colWidths=[1.2*inch, 1.2*inch, 0.8*inch, 1*inch, 1*inch])
orders_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 0.5, colors.black)
]))
elements.append(orders_table)
elif report_type == "inventory":
result = await db.execute(select(Product).where(Product.is_active == True).order_by(Product.stock))
products = result.scalars().all()
elements.append(Paragraph("Inventory Status", styles['Heading2']))
inv_data = [["Product", "Category", "Price", "Stock", "Status"]]
for product in products:
status = "LOW STOCK" if product.stock <= product.low_stock_threshold else "In Stock"
inv_data.append([
product.name[:30] + "..." if len(product.name) > 30 else product.name,
product.category,
f"${product.price:.2f}",
str(product.stock),
status
])
inv_table = Table(inv_data, colWidths=[2*inch, 1*inch, 0.8*inch, 0.6*inch, 0.8*inch])
inv_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 8),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 0.5, colors.black)
]))
elements.append(inv_table)
doc.build(elements)
buffer.seek(0)
return StreamingResponse(
buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={report_type}_report_{now.strftime('%Y%m%d')}.pdf"}
)
# ================== USER MANAGEMENT ==================
class UserCreateAdmin(BaseModel):
email: EmailStr
name: str
password: str
role: str
is_active: bool = True
password_never_change: bool = False
class UserUpdateAdmin(BaseModel):
email: Optional[EmailStr] = None
name: Optional[str] = None
password: Optional[str] = None # Allow password updates
role: Optional[str] = None
is_active: Optional[bool] = None
password_never_change: Optional[bool] = None
@api_router.get("/admin/users")
async def get_all_users(
skip: int = 0,
limit: int = 20,
search: str = "",
role: str = "",
status: str = "",
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Get all users with filters"""
query = select(User)
# Apply search filter
if search:
query = query.where(
or_(
User.name.ilike(f"%{search}%"),
User.email.ilike(f"%{search}%")
)
)
# Apply role filter
if role:
try:
role_enum = UserRole[role.upper()]
query = query.where(User.role == role_enum)
except KeyError:
pass
# Apply status filter
if status == "active":
query = query.where(User.is_active == True)
elif status == "inactive":
query = query.where(User.is_active == False)
# Get total count
count_result = await db.execute(select(func.count()).select_from(query.subquery()))
total = count_result.scalar()
# Apply pagination
query = query.offset(skip).limit(limit).order_by(User.created_at.desc())
result = await db.execute(query)
users = result.scalars().all()
return {
"users": [user_to_dict(user) for user in users],
"total": total,
"skip": skip,
"limit": limit
}
@api_router.post("/admin/users")
async def create_user(
user_data: UserCreateAdmin,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Create a new user (admin only)"""
# Check if email already exists
result = await db.execute(select(User).where(User.email == user_data.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
# Validate role
try:
role_enum = UserRole[user_data.role.upper()]
except KeyError:
raise HTTPException(status_code=400, detail=f"Invalid role: {user_data.role}")
# Create new user
new_user = User(
email=user_data.email,
name=user_data.name,
password=hash_password(user_data.password),
role=role_enum,
is_active=user_data.is_active
)
db.add(new_user)
await db.commit()
await db.refresh(new_user)
logger.info(f"Admin {admin.email} created user {new_user.email} with role {role_enum.value}")
return {
"message": "User created successfully",
"user": user_to_dict(new_user)
}
@api_router.put("/admin/users/{user_id}")
async def update_user(
user_id: str,
user_data: UserUpdateAdmin,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Update user details (admin only)"""
user = await _get_or_404(db, User, user_id, "User not found")
# Update fields if provided
if user_data.email is not None:
# Check if new email already exists
result = await db.execute(
select(User).where(User.email == user_data.email, User.id != user_id)
)
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already in use")
user.email = user_data.email
if user_data.name is not None:
user.name = user_data.name
if user_data.password is not None and user_data.password.strip():
# Only update password if provided and not empty
user.password = hash_password(user_data.password)
if user_data.role is not None:
try:
role_enum = UserRole[user_data.role.upper()]
user.role = role_enum
except KeyError:
raise HTTPException(status_code=400, detail=f"Invalid role: {user_data.role}")
if user_data.is_active is not None:
user.is_active = user_data.is_active
await db.commit()
await db.refresh(user)
logger.info(f"Admin {admin.email} updated user {user.email}")
return {
"message": "User updated successfully",
"user": user_to_dict(user)
}
@api_router.put("/admin/users/{user_id}/toggle-active")
async def toggle_user_active(
user_id: str,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Toggle user active status"""
user = await _get_or_404(db, User, user_id, "User not found")
# Prevent deactivating yourself
if user.id == admin.id:
raise HTTPException(status_code=400, detail="Cannot deactivate your own account")
user.is_active = not user.is_active
await db.commit()
await db.refresh(user)
status = "activated" if user.is_active else "deactivated"
logger.info(f"Admin {admin.email} {status} user {user.email}")
return {
"message": f"User {status} successfully",
"user": user_to_dict(user)
}
@api_router.delete("/admin/users/{user_id}")
async def delete_user(
user_id: str,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete a user (admin only)"""
user = await _get_or_404(db, User, user_id, "User not found")
# Prevent deleting yourself
if user.id == admin.id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
await db.delete(user)
await db.commit()
logger.info(f"Admin {admin.email} deleted user {user.email}")
return {"message": "User deleted successfully"}
# ================== ABOUT PAGE CMS ==================
# Pydantic schemas for About page
class AboutContentCreate(BaseModel):
section: str
title: Optional[str] = None
subtitle: Optional[str] = None
content: Optional[str] = None
image_url: Optional[str] = None
data: Optional[dict] = None
display_order: int = 0
is_active: bool = True
class AboutContentUpdate(BaseModel):
title: Optional[str] = None
subtitle: Optional[str] = None
content: Optional[str] = None
image_url: Optional[str] = None
data: Optional[dict] = None
display_order: Optional[int] = None
is_active: Optional[bool] = None
class TeamMemberCreate(BaseModel):
name: str
role: str
bio: Optional[str] = None
image_url: Optional[str] = None
email: Optional[str] = None
linkedin: Optional[str] = None
display_order: int = 0
is_active: bool = True
class TeamMemberUpdate(BaseModel):
name: Optional[str] = None
role: Optional[str] = None
bio: Optional[str] = None
image_url: Optional[str] = None
email: Optional[str] = None
linkedin: Optional[str] = None
display_order: Optional[int] = None
is_active: Optional[bool] = None
class CompanyValueCreate(BaseModel):
title: str
description: str
icon: str
display_order: int = 0
is_active: bool = True
class CompanyValueUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
icon: Optional[str] = None
display_order: Optional[int] = None
is_active: Optional[bool] = None
# Public endpoints (no auth required)
@api_router.get("/about/content")
async def get_about_content(db: AsyncSession = Depends(get_db)):
"""Get all active about content sections"""
result = await db.execute(
select(AboutContent)
.where(AboutContent.is_active == True)
.order_by(AboutContent.section, AboutContent.display_order)
)
content = result.scalars().all()
return [{
"id": str(item.id),
"section": item.section,
"title": item.title,
"subtitle": item.subtitle,
"content": item.content,
"image_url": item.image_url,
"data": item.data,
"display_order": item.display_order,
"created_at": item.created_at.isoformat() if item.created_at else None,
"updated_at": item.updated_at.isoformat() if item.updated_at else None
} for item in content]
@api_router.get("/about/team")
async def get_team_members(db: AsyncSession = Depends(get_db)):
"""Get all active team members"""
result = await db.execute(
select(TeamMember)
.where(TeamMember.is_active == True)
.order_by(TeamMember.display_order)
)
members = result.scalars().all()
return [{
"id": str(member.id),
"name": member.name,
"role": member.role,
"bio": member.bio,
"image_url": member.image_url,
"email": member.email,
"linkedin": member.linkedin,
"display_order": member.display_order
} for member in members]
@api_router.get("/about/values")
async def get_company_values(db: AsyncSession = Depends(get_db)):
"""Get all active company values"""
result = await db.execute(
select(CompanyValue)
.where(CompanyValue.is_active == True)
.order_by(CompanyValue.display_order)
)
values = result.scalars().all()
return [{
"id": str(value.id),
"title": value.title,
"description": value.description,
"icon": value.icon,
"display_order": value.display_order
} for value in values]
# Admin endpoints (auth required)
@api_router.get("/admin/about/content")
async def admin_get_all_about_content(
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Get all about content sections (including inactive)"""
result = await db.execute(
select(AboutContent).order_by(AboutContent.section, AboutContent.display_order)
)
content = result.scalars().all()
return [{
"id": str(item.id),
"section": item.section,
"title": item.title,
"subtitle": item.subtitle,
"content": item.content,
"image_url": item.image_url,
"data": item.data,
"display_order": item.display_order,
"is_active": item.is_active,
"created_at": item.created_at.isoformat() if item.created_at else None,
"updated_at": item.updated_at.isoformat() if item.updated_at else None
} for item in content]
@api_router.post("/admin/about/content")
async def admin_create_about_content(
content_data: AboutContentCreate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Create new about content section"""
content = AboutContent(
section=content_data.section,
title=content_data.title,
subtitle=content_data.subtitle,
content=content_data.content,
image_url=content_data.image_url,
data=content_data.data,
display_order=content_data.display_order,
is_active=content_data.is_active
)
db.add(content)
await db.commit()
await db.refresh(content)
logger.info(f"Admin {admin.email} created about content section: {content.section}")
return {
"id": str(content.id),
"section": content.section,
"title": content.title,
"subtitle": content.subtitle,
"content": content.content,
"image_url": content.image_url,
"data": content.data,
"display_order": content.display_order,
"is_active": content.is_active,
"created_at": content.created_at.isoformat() if content.created_at else None,
"updated_at": content.updated_at.isoformat() if content.updated_at else None
}
@api_router.put("/admin/about/content/{content_id}")
async def admin_update_about_content(
content_id: str,
content_data: AboutContentUpdate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Update about content section"""
try:
content_uuid = uuid.UUID(content_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid content ID format")
result = await db.execute(select(AboutContent).where(AboutContent.id == content_uuid))
content = result.scalar_one_or_none()
if not content:
raise HTTPException(status_code=404, detail="Content not found")
# Update fields
if content_data.title is not None:
content.title = content_data.title
if content_data.subtitle is not None:
content.subtitle = content_data.subtitle
if content_data.content is not None:
content.content = content_data.content
if content_data.image_url is not None:
content.image_url = content_data.image_url
if content_data.data is not None:
content.data = content_data.data
if content_data.display_order is not None:
content.display_order = content_data.display_order
if content_data.is_active is not None:
content.is_active = content_data.is_active
content.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(content)
logger.info(f"Admin {admin.email} updated about content: {content.section}")
return {
"id": str(content.id),
"section": content.section,
"title": content.title,
"subtitle": content.subtitle,
"content": content.content,
"image_url": content.image_url,
"data": content.data,
"display_order": content.display_order,
"is_active": content.is_active,
"updated_at": content.updated_at.isoformat() if content.updated_at else None
}
@api_router.delete("/admin/about/content/{content_id}")
async def admin_delete_about_content(
content_id: str,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete about content section"""
try:
content_uuid = uuid.UUID(content_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid content ID format")
result = await db.execute(select(AboutContent).where(AboutContent.id == content_uuid))
content = result.scalar_one_or_none()
if not content:
raise HTTPException(status_code=404, detail="Content not found")
await db.delete(content)
await db.commit()
logger.info(f"Admin {admin.email} deleted about content: {content.section}")
return {"message": "Content deleted successfully"}
# Team Members CRUD
@api_router.get("/admin/about/team")
async def admin_get_all_team_members(
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Get all team members (including inactive)"""
result = await db.execute(
select(TeamMember).order_by(TeamMember.display_order)
)
members = result.scalars().all()
return [{
"id": str(member.id),
"name": member.name,
"role": member.role,
"bio": member.bio,
"image_url": member.image_url,
"email": member.email,
"linkedin": member.linkedin,
"display_order": member.display_order,
"is_active": member.is_active,
"created_at": member.created_at.isoformat() if member.created_at else None,
"updated_at": member.updated_at.isoformat() if member.updated_at else None
} for member in members]
@api_router.post("/admin/about/team")
async def admin_create_team_member(
member_data: TeamMemberCreate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Create new team member"""
member = TeamMember(
name=member_data.name,
role=member_data.role,
bio=member_data.bio,
image_url=member_data.image_url,
email=member_data.email,
linkedin=member_data.linkedin,
display_order=member_data.display_order,
is_active=member_data.is_active
)
db.add(member)
await db.commit()
await db.refresh(member)
logger.info(f"Admin {admin.email} created team member: {member.name}")
return {
"id": str(member.id),
"name": member.name,
"role": member.role,
"bio": member.bio,
"image_url": member.image_url,
"email": member.email,
"linkedin": member.linkedin,
"display_order": member.display_order,
"is_active": member.is_active,
"created_at": member.created_at.isoformat() if member.created_at else None
}
@api_router.put("/admin/about/team/{member_id}")
async def admin_update_team_member(
member_id: str,
member_data: TeamMemberUpdate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Update team member"""
try:
member_uuid = uuid.UUID(member_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid member ID format")
result = await db.execute(select(TeamMember).where(TeamMember.id == member_uuid))
member = result.scalar_one_or_none()
if not member:
raise HTTPException(status_code=404, detail="Team member not found")
# Update fields
if member_data.name is not None:
member.name = member_data.name
if member_data.role is not None:
member.role = member_data.role
if member_data.bio is not None:
member.bio = member_data.bio
if member_data.image_url is not None:
member.image_url = member_data.image_url
if member_data.email is not None:
member.email = member_data.email
if member_data.linkedin is not None:
member.linkedin = member_data.linkedin
if member_data.display_order is not None:
member.display_order = member_data.display_order
if member_data.is_active is not None:
member.is_active = member_data.is_active
member.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(member)
logger.info(f"Admin {admin.email} updated team member: {member.name}")
return {
"id": str(member.id),
"name": member.name,
"role": member.role,
"bio": member.bio,
"image_url": member.image_url,
"email": member.email,
"linkedin": member.linkedin,
"display_order": member.display_order,
"is_active": member.is_active,
"updated_at": member.updated_at.isoformat() if member.updated_at else None
}
@api_router.delete("/admin/about/team/{member_id}")
async def admin_delete_team_member(
member_id: str,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete team member"""
try:
member_uuid = uuid.UUID(member_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid member ID format")
result = await db.execute(select(TeamMember).where(TeamMember.id == member_uuid))
member = result.scalar_one_or_none()
if not member:
raise HTTPException(status_code=404, detail="Team member not found")
await db.delete(member)
await db.commit()
logger.info(f"Admin {admin.email} deleted team member: {member.name}")
return {"message": "Team member deleted successfully"}
# Company Values CRUD
@api_router.get("/admin/about/values")
async def admin_get_all_company_values(
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Get all company values (including inactive)"""
result = await db.execute(
select(CompanyValue).order_by(CompanyValue.display_order)
)
values = result.scalars().all()
return [{
"id": str(value.id),
"title": value.title,
"description": value.description,
"icon": value.icon,
"display_order": value.display_order,
"is_active": value.is_active,
"created_at": value.created_at.isoformat() if value.created_at else None,
"updated_at": value.updated_at.isoformat() if value.updated_at else None
} for value in values]
@api_router.post("/admin/about/values")
async def admin_create_company_value(
value_data: CompanyValueCreate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Create new company value"""
value = CompanyValue(
title=value_data.title,
description=value_data.description,
icon=value_data.icon,
display_order=value_data.display_order,
is_active=value_data.is_active
)
db.add(value)
await db.commit()
await db.refresh(value)
logger.info(f"Admin {admin.email} created company value: {value.title}")
return {
"id": str(value.id),
"title": value.title,
"description": value.description,
"icon": value.icon,
"display_order": value.display_order,
"is_active": value.is_active,
"created_at": value.created_at.isoformat() if value.created_at else None
}
@api_router.put("/admin/about/values/{value_id}")
async def admin_update_company_value(
value_id: str,
value_data: CompanyValueUpdate,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Update company value"""
try:
value_uuid = uuid.UUID(value_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid value ID format")
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_uuid))
value = result.scalar_one_or_none()
if not value:
raise HTTPException(status_code=404, detail="Company value not found")
# Update fields
if value_data.title is not None:
value.title = value_data.title
if value_data.description is not None:
value.description = value_data.description
if value_data.icon is not None:
value.icon = value_data.icon
if value_data.display_order is not None:
value.display_order = value_data.display_order
if value_data.is_active is not None:
value.is_active = value_data.is_active
value.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(value)
logger.info(f"Admin {admin.email} updated company value: {value.title}")
return {
"id": str(value.id),
"title": value.title,
"description": value.description,
"icon": value.icon,
"display_order": value.display_order,
"is_active": value.is_active,
"updated_at": value.updated_at.isoformat() if value.updated_at else None
}
@api_router.delete("/admin/about/values/{value_id}")
async def admin_delete_company_value(
value_id: str,
admin: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
"""Delete company value"""
try:
value_uuid = uuid.UUID(value_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid value ID format")
result = await db.execute(select(CompanyValue).where(CompanyValue.id == value_uuid))
value = result.scalar_one_or_none()
if not value:
raise HTTPException(status_code=404, detail="Company value not found")
await db.delete(value)
await db.commit()
logger.info(f"Admin {admin.email} deleted company value: {value.title}")
return {"message": "Company value deleted successfully"}
# ================== SEED DATA ==================
@api_router.post("/seed")
async def seed_data(db: AsyncSession = Depends(get_db)):
# Check if data exists
result = await db.execute(select(func.count(Product.id)))
if result.scalar() > 0:
return {"message": "Data already seeded"}
# Create admin user
admin = User(
email="admin@techzone.com",
name="Admin",
password=hash_password("admin123"),
role=UserRole.ADMIN
)
db.add(admin)
# Create products
products = [
Product(name="MacBook Pro 16\"", description="Powerful laptop with M3 Pro chip, 18GB RAM, 512GB SSD.", price=2499.99, category="laptops", image_url="https://images.unsplash.com/photo-1517336714731-489689fd1ca8?w=800", stock=15, brand="Apple", specs={"processor": "M3 Pro", "ram": "18GB", "storage": "512GB SSD"}),
Product(name="Dell XPS 15", description="Ultra-thin laptop with Intel Core i7, 16GB RAM, stunning OLED display.", price=1799.99, category="laptops", image_url="https://images.unsplash.com/photo-1593642632559-0c6d3fc62b89?w=800", stock=20, brand="Dell", specs={"processor": "Intel i7", "ram": "16GB", "storage": "512GB SSD"}),
Product(name="iPhone 15 Pro Max", description="Latest iPhone with titanium design, A17 Pro chip, 48MP camera.", price=1199.99, category="phones", image_url="https://images.unsplash.com/photo-1695048133142-1a20484d2569?w=800", stock=30, brand="Apple", specs={"display": "6.7\" OLED", "camera": "48MP", "storage": "256GB"}),
Product(name="Samsung Galaxy S24 Ultra", description="Premium Android phone with S Pen, 200MP camera, AI features.", price=1299.99, category="phones", image_url="https://images.unsplash.com/photo-1610945265064-0e34e5519bbf?w=800", stock=25, brand="Samsung", specs={"display": "6.8\" AMOLED", "camera": "200MP", "storage": "512GB"}),
Product(name="Sony WH-1000XM5", description="Industry-leading noise cancellation, 30-hour battery.", price=349.99, category="accessories", image_url="https://images.unsplash.com/photo-1505740420928-5e560c06d30e?w=800", stock=40, brand="Sony", specs={"type": "Over-ear", "battery": "30 hours"}),
Product(name="iPad Pro 12.9\"", description="Powerful tablet with M2 chip, Liquid Retina XDR display.", price=1099.99, category="tablets", image_url="https://images.unsplash.com/photo-1544244015-0df4b3ffc6b0?w=800", stock=18, brand="Apple", specs={"processor": "M2", "display": "12.9\" XDR"}),
Product(name="Apple Watch Ultra 2", description="Rugged smartwatch with titanium case, GPS, 36-hour battery.", price=799.99, category="wearables", image_url="https://images.unsplash.com/photo-1434493789847-2f02dc6ca35d?w=800", stock=22, brand="Apple", specs={"display": "49mm", "battery": "36 hours"}),
Product(name="Logitech MX Master 3S", description="Premium wireless mouse with 8K DPI sensor, silent clicks.", price=99.99, category="accessories", image_url="https://images.unsplash.com/photo-1527864550417-7fd91fc51a46?w=800", stock=50, brand="Logitech", specs={"sensor": "8K DPI", "battery": "70 days"}),
]
for p in products:
db.add(p)
# Create services
services = [
Service(name="Screen Repair", description="Professional screen replacement for phones, tablets, and laptops.", price=149.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1581092918056-0c4c3acd3789?w=800", category="repair"),
Service(name="Battery Replacement", description="Restore your device's battery life with genuine replacement.", price=79.99, duration="30-60 mins", image_url="https://images.unsplash.com/photo-1609091839311-d5365f9ff1c5?w=800", category="repair"),
Service(name="Data Recovery", description="Professional data recovery from damaged drives.", price=199.99, duration="2-5 days", image_url="https://images.unsplash.com/photo-1558494949-ef010cbdcc31?w=800", category="data"),
Service(name="Virus Removal", description="Complete malware and virus removal with system optimization.", price=89.99, duration="1-3 hours", image_url="https://images.unsplash.com/photo-1526374965328-7f61d4dc18c5?w=800", category="software"),
Service(name="Hardware Upgrade", description="Upgrade your RAM, SSD, or other components.", price=49.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1591799265444-d66432b91588?w=800", category="upgrade"),
Service(name="Device Setup", description="Complete setup service for new devices including data transfer.", price=59.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1531297484001-80022131f5a1?w=800", category="setup"),
]
for s in services:
db.add(s)
await db.commit()
return {"message": "Data seeded successfully"}
# ================== ROOT ==================
@api_router.get("/")
async def root():
return {"message": "TechZone API is running", "version": "2.0.0", "status": "healthy"}
@api_router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""Comprehensive health check endpoint"""
try:
# Test database connection
result = await db.execute(select(func.count(User.id)))
user_count = result.scalar()
return {
"status": "healthy",
"database": "connected",
"api_version": "2.0.0",
"timestamp": datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "degraded",
"database": "error",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat()
}
# Include the router
app.include_router(api_router)
# Mount static files for uploaded images
app.mount("/uploads", StaticFiles(directory=str(ROOT_DIR / "uploads")), name="uploads")
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','),
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8181)