Files

1475 lines
56 KiB
Python

from fastapi import FastAPI, APIRouter, HTTPException, Depends, status, Query, Response
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import StreamingResponse
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_, or_, desc, asc
from sqlalchemy.orm import selectinload
import os
import logging
from pathlib import Path
from pydantic import BaseModel, Field, EmailStr, ConfigDict
from typing import List, Optional, Dict, Any
import uuid
from datetime import datetime, timezone, timedelta
import bcrypt
import jwt
import io
import csv
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter, A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from database import get_db, init_db, AsyncSessionLocal
from models import (
User, Product, Service, CartItem, Order, OrderItem, OrderStatusHistory,
Review, Booking, Contact, InventoryLog, Category, SalesReport,
OrderStatus, UserRole, Base
)
ROOT_DIR = Path(__file__).parent
load_dotenv(ROOT_DIR / '.env')
# JWT Configuration
SECRET_KEY = os.environ.get('JWT_SECRET', 'techzone-super-secret-key-2024-production')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24
# Create the main app
app = FastAPI(title="TechZone API", version="2.0.0")
# Create a router with the /api prefix
api_router = APIRouter(prefix="/api")
security = HTTPBearer()
# ================== PYDANTIC MODELS ==================
class UserCreate(BaseModel):
email: EmailStr
name: str
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
class ProductCreate(BaseModel):
name: str
description: str
price: float
category: str
image_url: str
stock: int = 10
low_stock_threshold: int = 5
brand: str = ""
specs: dict = {}
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
image_url: Optional[str] = None
stock: Optional[int] = None
low_stock_threshold: Optional[int] = None
brand: Optional[str] = None
specs: Optional[dict] = None
is_active: Optional[bool] = None
class ServiceCreate(BaseModel):
name: str
description: str
price: float
duration: str
image_url: str
category: str
class ServiceUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
duration: Optional[str] = None
image_url: Optional[str] = None
category: Optional[str] = None
is_active: Optional[bool] = None
class CartItemCreate(BaseModel):
product_id: str
quantity: int = 1
class OrderCreate(BaseModel):
shipping_address: dict = {}
notes: str = ""
class OrderStatusUpdate(BaseModel):
status: str
notes: str = ""
tracking_number: Optional[str] = None
class ReviewCreate(BaseModel):
product_id: Optional[str] = None
service_id: Optional[str] = None
rating: int
title: str = ""
comment: str = ""
class BookingCreate(BaseModel):
service_id: str
name: str
email: EmailStr
phone: str
preferred_date: str
notes: str = ""
class ContactCreate(BaseModel):
name: str
email: EmailStr
subject: str
message: str
class InventoryAdjust(BaseModel):
quantity_change: int
notes: str = ""
# ================== HELPERS ==================
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_admin_user(user: User = Depends(get_current_user)):
if user.role != UserRole.ADMIN:
raise HTTPException(status_code=403, detail="Admin access required")
return user
async def get_optional_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)), db: AsyncSession = Depends(get_db)):
if credentials is None:
return None
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id:
result = await db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
except:
pass
return None
def user_to_dict(user: User) -> dict:
return {
"id": user.id,
"email": user.email,
"name": user.name,
"role": user.role.value if user.role else "user",
"created_at": user.created_at.isoformat() if user.created_at else None
}
def product_to_dict(product: Product, include_reviews: bool = False) -> dict:
data = {
"id": product.id,
"name": product.name,
"description": product.description,
"price": product.price,
"category": product.category,
"image_url": product.image_url,
"stock": product.stock,
"low_stock_threshold": product.low_stock_threshold,
"brand": product.brand,
"specs": product.specs or {},
"is_active": product.is_active,
"created_at": product.created_at.isoformat() if product.created_at else None
}
if include_reviews and product.reviews:
data["reviews"] = [review_to_dict(r) for r in product.reviews]
data["average_rating"] = sum(r.rating for r in product.reviews) / len(product.reviews) if product.reviews else 0
data["review_count"] = len(product.reviews)
return data
def service_to_dict(service: Service, include_reviews: bool = False) -> dict:
data = {
"id": service.id,
"name": service.name,
"description": service.description,
"price": service.price,
"duration": service.duration,
"image_url": service.image_url,
"category": service.category,
"is_active": service.is_active,
"created_at": service.created_at.isoformat() if service.created_at else None
}
if include_reviews and service.reviews:
data["reviews"] = [review_to_dict(r) for r in service.reviews]
data["average_rating"] = sum(r.rating for r in service.reviews) / len(service.reviews) if service.reviews else 0
data["review_count"] = len(service.reviews)
return data
def order_to_dict(order: Order) -> dict:
return {
"id": order.id,
"user_id": order.user_id,
"status": order.status.value if order.status else "pending",
"subtotal": order.subtotal,
"tax": order.tax,
"shipping": order.shipping,
"total": order.total,
"shipping_address": order.shipping_address or {},
"notes": order.notes,
"tracking_number": order.tracking_number,
"created_at": order.created_at.isoformat() if order.created_at else None,
"updated_at": order.updated_at.isoformat() if order.updated_at else None,
"items": [order_item_to_dict(item) for item in order.items] if order.items else [],
"status_history": [status_history_to_dict(h) for h in order.status_history] if order.status_history else []
}
def order_item_to_dict(item: OrderItem) -> dict:
return {
"id": item.id,
"product_id": item.product_id,
"product_name": item.product_name,
"product_image": item.product_image,
"quantity": item.quantity,
"price": item.price
}
def status_history_to_dict(history: OrderStatusHistory) -> dict:
return {
"id": history.id,
"status": history.status.value if history.status else None,
"notes": history.notes,
"created_at": history.created_at.isoformat() if history.created_at else None
}
def review_to_dict(review: Review) -> dict:
return {
"id": review.id,
"user_id": review.user_id,
"user_name": review.user.name if review.user else "Anonymous",
"product_id": review.product_id,
"service_id": review.service_id,
"rating": review.rating,
"title": review.title,
"comment": review.comment,
"is_verified_purchase": review.is_verified_purchase,
"created_at": review.created_at.isoformat() if review.created_at else None
}
def booking_to_dict(booking: Booking) -> dict:
return {
"id": booking.id,
"service_id": booking.service_id,
"service_name": booking.service_name,
"name": booking.name,
"email": booking.email,
"phone": booking.phone,
"preferred_date": booking.preferred_date,
"notes": booking.notes,
"status": booking.status,
"created_at": booking.created_at.isoformat() if booking.created_at else None
}
# ================== AUTH ROUTES ==================
@api_router.post("/auth/register", response_model=TokenResponse)
async def register(user_data: UserCreate, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == user_data.email))
if result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="Email already registered")
user = User(
email=user_data.email,
name=user_data.name,
password=hash_password(user_data.password),
role=UserRole.USER
)
db.add(user)
await db.commit()
await db.refresh(user)
token = create_access_token({"sub": user.id})
return TokenResponse(access_token=token, user=user_to_dict(user))
@api_router.post("/auth/login", response_model=TokenResponse)
async def login(credentials: UserLogin, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.email == credentials.email))
user = result.scalar_one_or_none()
if not user or not verify_password(credentials.password, user.password):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = create_access_token({"sub": user.id})
return TokenResponse(access_token=token, user=user_to_dict(user))
@api_router.get("/auth/me")
async def get_me(user: User = Depends(get_current_user)):
return user_to_dict(user)
# ================== PRODUCTS ROUTES ==================
@api_router.get("/products")
async def get_products(
category: Optional[str] = None,
search: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
in_stock: Optional[bool] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Product).where(Product.is_active == True)
if category and category != "all":
query = query.where(Product.category == category)
if search:
query = query.where(
or_(
Product.name.ilike(f"%{search}%"),
Product.description.ilike(f"%{search}%"),
Product.brand.ilike(f"%{search}%")
)
)
if min_price is not None:
query = query.where(Product.price >= min_price)
if max_price is not None:
query = query.where(Product.price <= max_price)
if in_stock:
query = query.where(Product.stock > 0)
query = query.options(selectinload(Product.reviews).selectinload(Review.user))
result = await db.execute(query)
products = result.scalars().all()
return [product_to_dict(p, include_reviews=True) for p in products]
@api_router.get("/products/{product_id}")
async def get_product(product_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Product)
.where(Product.id == product_id)
.options(selectinload(Product.reviews).selectinload(Review.user))
)
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product_to_dict(product, include_reviews=True)
@api_router.get("/products/categories/list")
async def get_product_categories(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product.category).distinct())
categories = [row[0] for row in result.fetchall()]
return categories
# ================== SERVICES ROUTES ==================
@api_router.get("/services")
async def get_services(category: Optional[str] = None, db: AsyncSession = Depends(get_db)):
query = select(Service).where(Service.is_active == True)
if category and category != "all":
query = query.where(Service.category == category)
query = query.options(selectinload(Service.reviews).selectinload(Review.user))
result = await db.execute(query)
services = result.scalars().all()
return [service_to_dict(s, include_reviews=True) for s in services]
@api_router.get("/services/{service_id}")
async def get_service(service_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Service)
.where(Service.id == service_id)
.options(selectinload(Service.reviews).selectinload(Review.user))
)
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
return service_to_dict(service, include_reviews=True)
@api_router.post("/services/book")
async def book_service(
booking_data: BookingCreate,
user: Optional[User] = Depends(get_optional_user),
db: AsyncSession = Depends(get_db)
):
result = await db.execute(select(Service).where(Service.id == booking_data.service_id))
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
booking = Booking(
service_id=booking_data.service_id,
user_id=user.id if user else None,
name=booking_data.name,
email=booking_data.email,
phone=booking_data.phone,
preferred_date=booking_data.preferred_date,
notes=booking_data.notes,
service_name=service.name
)
db.add(booking)
await db.commit()
return {"message": "Booking created successfully", "booking_id": booking.id}
# ================== CART ROUTES ==================
@api_router.get("/cart")
async def get_cart(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem)
.where(CartItem.user_id == user.id)
.options(selectinload(CartItem.product))
)
cart_items = result.scalars().all()
return [{
"id": item.id,
"product_id": item.product_id,
"quantity": item.quantity,
"product": product_to_dict(item.product) if item.product else None
} for item in cart_items]
@api_router.post("/cart/add")
async def add_to_cart(item: CartItemCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product).where(Product.id == item.product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
result = await db.execute(
select(CartItem).where(
and_(CartItem.user_id == user.id, CartItem.product_id == item.product_id)
)
)
existing = result.scalar_one_or_none()
if existing:
existing.quantity += item.quantity
else:
cart_item = CartItem(user_id=user.id, product_id=item.product_id, quantity=item.quantity)
db.add(cart_item)
await db.commit()
return {"message": "Item added to cart"}
@api_router.put("/cart/{item_id}")
async def update_cart_item(item_id: str, quantity: int = Query(...), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem).where(and_(CartItem.id == item_id, CartItem.user_id == user.id))
)
item = result.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="Cart item not found")
if quantity <= 0:
await db.delete(item)
else:
item.quantity = quantity
await db.commit()
return {"message": "Cart updated"}
@api_router.delete("/cart/{item_id}")
async def remove_from_cart(item_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(CartItem).where(and_(CartItem.id == item_id, CartItem.user_id == user.id))
)
item = result.scalar_one_or_none()
if item:
await db.delete(item)
await db.commit()
return {"message": "Item removed from cart"}
@api_router.delete("/cart")
async def clear_cart(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
await db.execute(
CartItem.__table__.delete().where(CartItem.user_id == user.id)
)
await db.commit()
return {"message": "Cart cleared"}
# ================== ORDERS ROUTES ==================
@api_router.post("/orders")
async def create_order(order_data: OrderCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
# Get cart items
result = await db.execute(
select(CartItem)
.where(CartItem.user_id == user.id)
.options(selectinload(CartItem.product))
)
cart_items = result.scalars().all()
if not cart_items:
raise HTTPException(status_code=400, detail="Cart is empty")
# Calculate totals
subtotal = sum(item.product.price * item.quantity for item in cart_items)
tax = subtotal * 0.08
shipping = 0 if subtotal > 100 else 9.99
total = subtotal + tax + shipping
# Create order
order = Order(
user_id=user.id,
status=OrderStatus.PENDING,
subtotal=subtotal,
tax=tax,
shipping=shipping,
total=total,
shipping_address=order_data.shipping_address,
notes=order_data.notes
)
db.add(order)
await db.flush()
# Create order items and update inventory
for cart_item in cart_items:
product = cart_item.product
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=cart_item.quantity,
price=product.price,
product_name=product.name,
product_image=product.image_url
)
db.add(order_item)
# Update stock
previous_stock = product.stock
product.stock = max(0, product.stock - cart_item.quantity)
# Log inventory change
inv_log = InventoryLog(
product_id=product.id,
action="sale",
quantity_change=-cart_item.quantity,
previous_stock=previous_stock,
new_stock=product.stock,
notes=f"Order {order.id}",
created_by=user.id
)
db.add(inv_log)
# Add status history
status_history = OrderStatusHistory(
order_id=order.id,
status=OrderStatus.PENDING,
notes="Order placed",
created_by=user.id
)
db.add(status_history)
# Clear cart
await db.execute(CartItem.__table__.delete().where(CartItem.user_id == user.id))
await db.commit()
await db.refresh(order)
return {"message": "Order created successfully", "order_id": order.id}
@api_router.get("/orders")
async def get_orders(user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order)
.where(Order.user_id == user.id)
.options(selectinload(Order.items), selectinload(Order.status_history))
.order_by(desc(Order.created_at))
)
orders = result.scalars().all()
return [order_to_dict(o) for o in orders]
@api_router.get("/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order)
.where(and_(Order.id == order_id, Order.user_id == user.id))
.options(selectinload(Order.items), selectinload(Order.status_history))
)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order_to_dict(order)
# ================== REVIEWS ROUTES ==================
@api_router.post("/reviews")
async def create_review(review_data: ReviewCreate, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)):
if not review_data.product_id and not review_data.service_id:
raise HTTPException(status_code=400, detail="Product or service ID required")
if review_data.rating < 1 or review_data.rating > 5:
raise HTTPException(status_code=400, detail="Rating must be between 1 and 5")
# Check for verified purchase
is_verified = False
if review_data.product_id:
result = await db.execute(
select(OrderItem)
.join(Order)
.where(
and_(
Order.user_id == user.id,
OrderItem.product_id == review_data.product_id,
Order.status.in_([OrderStatus.DELIVERED, OrderStatus.SHIPPED])
)
)
)
if result.scalar_one_or_none():
is_verified = True
review = Review(
user_id=user.id,
product_id=review_data.product_id,
service_id=review_data.service_id,
rating=review_data.rating,
title=review_data.title,
comment=review_data.comment,
is_verified_purchase=is_verified
)
db.add(review)
await db.commit()
await db.refresh(review)
return {"message": "Review submitted successfully", "review_id": review.id}
@api_router.get("/reviews/product/{product_id}")
async def get_product_reviews(product_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Review)
.where(and_(Review.product_id == product_id, Review.is_approved == True))
.options(selectinload(Review.user))
.order_by(desc(Review.created_at))
)
reviews = result.scalars().all()
return [review_to_dict(r) for r in reviews]
@api_router.get("/reviews/service/{service_id}")
async def get_service_reviews(service_id: str, db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Review)
.where(and_(Review.service_id == service_id, Review.is_approved == True))
.options(selectinload(Review.user))
.order_by(desc(Review.created_at))
)
reviews = result.scalars().all()
return [review_to_dict(r) for r in reviews]
# ================== CONTACT ROUTES ==================
@api_router.post("/contact")
async def submit_contact(contact_data: ContactCreate, db: AsyncSession = Depends(get_db)):
contact = Contact(
name=contact_data.name,
email=contact_data.email,
subject=contact_data.subject,
message=contact_data.message
)
db.add(contact)
await db.commit()
return {"message": "Message sent successfully", "id": contact.id}
# ================== ADMIN ROUTES ==================
# Admin - Dashboard Stats
@api_router.get("/admin/dashboard")
async def get_admin_dashboard(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
today = datetime.now(timezone.utc).date()
month_ago = today - timedelta(days=30)
# Total counts
products_count = await db.execute(select(func.count(Product.id)))
services_count = await db.execute(select(func.count(Service.id)))
users_count = await db.execute(select(func.count(User.id)))
orders_count = await db.execute(select(func.count(Order.id)))
# Revenue
total_revenue = await db.execute(select(func.sum(Order.total)))
monthly_revenue = await db.execute(
select(func.sum(Order.total))
.where(Order.created_at >= datetime.combine(month_ago, datetime.min.time()))
)
# Today's stats
today_orders = await db.execute(
select(func.count(Order.id))
.where(func.date(Order.created_at) == today)
)
today_revenue = await db.execute(
select(func.sum(Order.total))
.where(func.date(Order.created_at) == today)
)
# Low stock products - don't use relationships
low_stock = await db.execute(
select(Product)
.where(Product.stock <= Product.low_stock_threshold)
.where(Product.is_active == True)
)
low_stock_list = low_stock.scalars().all()
low_stock_products = [{
"id": p.id,
"name": p.name,
"stock": p.stock,
"low_stock_threshold": p.low_stock_threshold,
"category": p.category
} for p in low_stock_list]
# Recent orders - load items explicitly
recent_orders_result = await db.execute(
select(Order)
.options(selectinload(Order.items))
.order_by(desc(Order.created_at))
.limit(10)
)
recent_orders_list = recent_orders_result.scalars().all()
# Pending bookings
pending_bookings = await db.execute(
select(func.count(Booking.id))
.where(Booking.status == "pending")
)
recent_orders_data = []
for o in recent_orders_list:
recent_orders_data.append({
"id": o.id,
"status": o.status.value if o.status else "pending",
"total": o.total,
"created_at": o.created_at.isoformat() if o.created_at else None,
"items": [{"id": i.id, "product_name": i.product_name, "quantity": i.quantity} for i in o.items] if o.items else []
})
return {
"stats": {
"total_products": products_count.scalar() or 0,
"total_services": services_count.scalar() or 0,
"total_users": users_count.scalar() or 0,
"total_orders": orders_count.scalar() or 0,
"total_revenue": total_revenue.scalar() or 0,
"monthly_revenue": monthly_revenue.scalar() or 0,
"today_orders": today_orders.scalar() or 0,
"today_revenue": today_revenue.scalar() or 0,
"pending_bookings": pending_bookings.scalar() or 0
},
"low_stock_products": low_stock_products,
"recent_orders": recent_orders_data
}
return {
"stats": {
"total_products": products_count.scalar() or 0,
"total_services": services_count.scalar() or 0,
"total_users": users_count.scalar() or 0,
"total_orders": orders_count.scalar() or 0,
"total_revenue": total_revenue.scalar() or 0,
"monthly_revenue": monthly_revenue.scalar() or 0,
"today_orders": today_orders.scalar() or 0,
"today_revenue": today_revenue.scalar() or 0,
"pending_bookings": pending_bookings.scalar() or 0
},
"low_stock_products": low_stock_products,
"recent_orders": recent_orders_data
}
# Admin - Products CRUD
@api_router.get("/admin/products")
async def admin_get_products(
include_inactive: bool = False,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
query = select(Product)
if not include_inactive:
query = query.where(Product.is_active == True)
query = query.order_by(desc(Product.created_at))
result = await db.execute(query)
products = result.scalars().all()
return [product_to_dict(p) for p in products]
@api_router.post("/admin/products")
async def admin_create_product(product_data: ProductCreate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
product = Product(**product_data.model_dump())
db.add(product)
await db.commit()
await db.refresh(product)
# Log inventory
inv_log = InventoryLog(
product_id=product.id,
action="add",
quantity_change=product.stock,
previous_stock=0,
new_stock=product.stock,
notes="Initial stock",
created_by=user.id
)
db.add(inv_log)
await db.commit()
return product_to_dict(product)
@api_router.put("/admin/products/{product_id}")
async def admin_update_product(product_id: str, product_data: ProductUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
update_data = product_data.model_dump(exclude_unset=True)
# Track stock changes
if "stock" in update_data and update_data["stock"] != product.stock:
inv_log = InventoryLog(
product_id=product.id,
action="adjust",
quantity_change=update_data["stock"] - product.stock,
previous_stock=product.stock,
new_stock=update_data["stock"],
notes="Manual adjustment",
created_by=user.id
)
db.add(inv_log)
for key, value in update_data.items():
setattr(product, key, value)
await db.commit()
await db.refresh(product)
return product_to_dict(product)
@api_router.delete("/admin/products/{product_id}")
async def admin_delete_product(product_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
product.is_active = False
await db.commit()
return {"message": "Product deleted"}
# Admin - Services CRUD
@api_router.get("/admin/services")
async def admin_get_services(include_inactive: bool = False, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
query = select(Service)
if not include_inactive:
query = query.where(Service.is_active == True)
query = query.order_by(desc(Service.created_at))
result = await db.execute(query)
services = result.scalars().all()
return [service_to_dict(s) for s in services]
@api_router.post("/admin/services")
async def admin_create_service(service_data: ServiceCreate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
service = Service(**service_data.model_dump())
db.add(service)
await db.commit()
await db.refresh(service)
return service_to_dict(service)
@api_router.put("/admin/services/{service_id}")
async def admin_update_service(service_id: str, service_data: ServiceUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Service).where(Service.id == service_id))
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
update_data = service_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(service, key, value)
await db.commit()
await db.refresh(service)
return service_to_dict(service)
@api_router.delete("/admin/services/{service_id}")
async def admin_delete_service(service_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Service).where(Service.id == service_id))
service = result.scalar_one_or_none()
if not service:
raise HTTPException(status_code=404, detail="Service not found")
service.is_active = False
await db.commit()
return {"message": "Service deleted"}
# Admin - Orders Management
@api_router.get("/admin/orders")
async def admin_get_orders(
status: Optional[str] = None,
limit: int = 50,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
query = select(Order).options(selectinload(Order.items), selectinload(Order.status_history), selectinload(Order.user))
if status:
query = query.where(Order.status == OrderStatus(status))
query = query.order_by(desc(Order.created_at)).limit(limit)
result = await db.execute(query)
orders = result.scalars().all()
return [{
**order_to_dict(o),
"user_name": o.user.name if o.user else "Unknown",
"user_email": o.user.email if o.user else "Unknown"
} for o in orders]
@api_router.put("/admin/orders/{order_id}/status")
async def admin_update_order_status(order_id: str, status_data: OrderStatusUpdate, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Order).where(Order.id == order_id).options(selectinload(Order.items))
)
order = result.scalar_one_or_none()
if not order:
raise HTTPException(status_code=404, detail="Order not found")
new_status = OrderStatus(status_data.status)
order.status = new_status
if status_data.tracking_number:
order.tracking_number = status_data.tracking_number
# Handle refunds - restore stock
if new_status == OrderStatus.REFUNDED:
for item in order.items:
result = await db.execute(select(Product).where(Product.id == item.product_id))
product = result.scalar_one_or_none()
if product:
previous_stock = product.stock
product.stock += item.quantity
inv_log = InventoryLog(
product_id=product.id,
action="refund",
quantity_change=item.quantity,
previous_stock=previous_stock,
new_stock=product.stock,
notes=f"Refund for order {order_id}",
created_by=user.id
)
db.add(inv_log)
# Add status history
status_history = OrderStatusHistory(
order_id=order.id,
status=new_status,
notes=status_data.notes,
created_by=user.id
)
db.add(status_history)
await db.commit()
return {"message": "Order status updated"}
# Admin - Inventory Management
@api_router.get("/admin/inventory")
async def admin_get_inventory(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(Product)
.where(Product.is_active == True)
.order_by(Product.stock)
)
products = result.scalars().all()
return [{
**product_to_dict(p),
"is_low_stock": p.stock <= p.low_stock_threshold
} for p in products]
@api_router.post("/admin/inventory/{product_id}/adjust")
async def admin_adjust_inventory(product_id: str, adjustment: InventoryAdjust, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Product).where(Product.id == product_id))
product = result.scalar_one_or_none()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
previous_stock = product.stock
product.stock = max(0, product.stock + adjustment.quantity_change)
inv_log = InventoryLog(
product_id=product.id,
action="adjust" if adjustment.quantity_change >= 0 else "remove",
quantity_change=adjustment.quantity_change,
previous_stock=previous_stock,
new_stock=product.stock,
notes=adjustment.notes,
created_by=user.id
)
db.add(inv_log)
await db.commit()
return {"message": "Inventory adjusted", "new_stock": product.stock}
@api_router.get("/admin/inventory/{product_id}/logs")
async def admin_get_inventory_logs(product_id: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(
select(InventoryLog)
.where(InventoryLog.product_id == product_id)
.order_by(desc(InventoryLog.created_at))
.limit(50)
)
logs = result.scalars().all()
return [{
"id": log.id,
"action": log.action,
"quantity_change": log.quantity_change,
"previous_stock": log.previous_stock,
"new_stock": log.new_stock,
"notes": log.notes,
"created_at": log.created_at.isoformat() if log.created_at else None
} for log in logs]
# Admin - Bookings Management
@api_router.get("/admin/bookings")
async def admin_get_bookings(status: Optional[str] = None, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
query = select(Booking).options(selectinload(Booking.service))
if status:
query = query.where(Booking.status == status)
query = query.order_by(desc(Booking.created_at))
result = await db.execute(query)
bookings = result.scalars().all()
return [booking_to_dict(b) for b in bookings]
@api_router.put("/admin/bookings/{booking_id}/status")
async def admin_update_booking_status(booking_id: str, status: str, user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Booking).where(Booking.id == booking_id))
booking = result.scalar_one_or_none()
if not booking:
raise HTTPException(status_code=404, detail="Booking not found")
booking.status = status
await db.commit()
return {"message": "Booking status updated"}
# Admin - Users Management
@api_router.get("/admin/users")
async def admin_get_users(user: User = Depends(get_admin_user), db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).order_by(desc(User.created_at)))
users = result.scalars().all()
return [user_to_dict(u) for u in users]
# Admin - Reports
@api_router.get("/admin/reports/sales")
async def admin_get_sales_report(
period: str = "daily", # daily, weekly, monthly
start_date: Optional[str] = None,
end_date: Optional[str] = None,
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
now = datetime.now(timezone.utc)
if start_date:
start = datetime.fromisoformat(start_date.replace('Z', '+00:00'))
else:
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
if end_date:
end = datetime.fromisoformat(end_date.replace('Z', '+00:00'))
else:
end = now
# Get orders in date range
result = await db.execute(
select(Order)
.where(and_(Order.created_at >= start, Order.created_at <= end))
.options(selectinload(Order.items))
.order_by(Order.created_at)
)
orders = result.scalars().all()
# Get bookings in date range
bookings_result = await db.execute(
select(Booking)
.where(and_(Booking.created_at >= start, Booking.created_at <= end))
)
bookings = bookings_result.scalars().all()
# Aggregate by period
report_data = {}
for order in orders:
if period == "daily":
key = order.created_at.strftime("%Y-%m-%d")
elif period == "weekly":
key = order.created_at.strftime("%Y-W%W")
else:
key = order.created_at.strftime("%Y-%m")
if key not in report_data:
report_data[key] = {
"period": key,
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
}
report_data[key]["orders"] += 1
report_data[key]["revenue"] += order.total
report_data[key]["products_sold"] += sum(item.quantity for item in order.items)
status = order.status.value if order.status else "unknown"
report_data[key]["order_statuses"][status] = report_data[key]["order_statuses"].get(status, 0) + 1
# Add booking counts
for booking in bookings:
if period == "daily":
key = booking.created_at.strftime("%Y-%m-%d")
elif period == "weekly":
key = booking.created_at.strftime("%Y-W%W")
else:
key = booking.created_at.strftime("%Y-%m")
if key not in report_data:
report_data[key] = {
"period": key,
"orders": 0,
"revenue": 0,
"products_sold": 0,
"order_statuses": {}
}
report_data[key]["services_booked"] = report_data[key].get("services_booked", 0) + 1
# Calculate totals
total_orders = len(orders)
total_revenue = sum(o.total for o in orders)
total_products = sum(sum(item.quantity for item in o.items) for o in orders)
total_bookings = len(bookings)
return {
"period": period,
"start_date": start.isoformat(),
"end_date": end.isoformat(),
"summary": {
"total_orders": total_orders,
"total_revenue": total_revenue,
"total_products_sold": total_products,
"total_services_booked": total_bookings,
"average_order_value": total_revenue / total_orders if total_orders > 0 else 0
},
"data": list(report_data.values())
}
# Admin - Export Reports
@api_router.get("/admin/reports/export/csv")
async def admin_export_csv(
report_type: str = "sales", # sales, inventory, orders
period: str = "monthly",
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
output = io.StringIO()
writer = csv.writer(output)
now = datetime.now(timezone.utc)
if report_type == "sales":
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
result = await db.execute(
select(Order)
.where(Order.created_at >= start)
.options(selectinload(Order.items), selectinload(Order.user))
.order_by(Order.created_at)
)
orders = result.scalars().all()
writer.writerow(["Date", "Order ID", "Customer", "Items", "Subtotal", "Tax", "Shipping", "Total", "Status"])
for order in orders:
writer.writerow([
order.created_at.strftime("%Y-%m-%d %H:%M"),
order.id,
order.user.name if order.user else "Guest",
sum(item.quantity for item in order.items),
f"${order.subtotal:.2f}",
f"${order.tax:.2f}",
f"${order.shipping:.2f}",
f"${order.total:.2f}",
order.status.value if order.status else "unknown"
])
elif report_type == "inventory":
result = await db.execute(select(Product).where(Product.is_active == True))
products = result.scalars().all()
writer.writerow(["Product ID", "Name", "Category", "Brand", "Price", "Stock", "Low Stock Threshold", "Status"])
for product in products:
writer.writerow([
product.id,
product.name,
product.category,
product.brand,
f"${product.price:.2f}",
product.stock,
product.low_stock_threshold,
"Low Stock" if product.stock <= product.low_stock_threshold else "In Stock"
])
elif report_type == "orders":
result = await db.execute(
select(Order)
.options(selectinload(Order.items), selectinload(Order.user))
.order_by(desc(Order.created_at))
.limit(500)
)
orders = result.scalars().all()
writer.writerow(["Order ID", "Date", "Customer", "Email", "Items", "Total", "Status", "Tracking"])
for order in orders:
writer.writerow([
order.id,
order.created_at.strftime("%Y-%m-%d %H:%M"),
order.user.name if order.user else "Guest",
order.user.email if order.user else "",
sum(item.quantity for item in order.items),
f"${order.total:.2f}",
order.status.value if order.status else "unknown",
order.tracking_number or ""
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={report_type}_report_{now.strftime('%Y%m%d')}.csv"}
)
@api_router.get("/admin/reports/export/pdf")
async def admin_export_pdf(
report_type: str = "sales",
period: str = "monthly",
user: User = Depends(get_admin_user),
db: AsyncSession = Depends(get_db)
):
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
styles = getSampleStyleSheet()
elements = []
now = datetime.now(timezone.utc)
# Title
title_style = ParagraphStyle(
'CustomTitle',
parent=styles['Heading1'],
fontSize=24,
spaceAfter=30
)
elements.append(Paragraph(f"TechZone {report_type.title()} Report", title_style))
elements.append(Paragraph(f"Generated: {now.strftime('%Y-%m-%d %H:%M')}", styles['Normal']))
elements.append(Spacer(1, 20))
if report_type == "sales":
if period == "daily":
start = now - timedelta(days=30)
elif period == "weekly":
start = now - timedelta(weeks=12)
else:
start = now - timedelta(days=365)
result = await db.execute(
select(Order)
.where(Order.created_at >= start)
.options(selectinload(Order.items))
)
orders = result.scalars().all()
# Summary
total_orders = len(orders)
total_revenue = sum(o.total for o in orders)
total_products = sum(sum(item.quantity for item in o.items) for o in orders)
elements.append(Paragraph("Summary", styles['Heading2']))
summary_data = [
["Metric", "Value"],
["Total Orders", str(total_orders)],
["Total Revenue", f"${total_revenue:.2f}"],
["Products Sold", str(total_products)],
["Average Order Value", f"${total_revenue/total_orders:.2f}" if total_orders > 0 else "$0.00"]
]
summary_table = Table(summary_data, colWidths=[3*inch, 2*inch])
summary_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
elements.append(summary_table)
elements.append(Spacer(1, 20))
# Orders table
elements.append(Paragraph("Recent Orders", styles['Heading2']))
orders_data = [["Date", "Order ID", "Items", "Total", "Status"]]
for order in orders[:50]:
orders_data.append([
order.created_at.strftime("%Y-%m-%d"),
order.id[:8] + "...",
str(sum(item.quantity for item in order.items)),
f"${order.total:.2f}",
order.status.value if order.status else "unknown"
])
orders_table = Table(orders_data, colWidths=[1.2*inch, 1.2*inch, 0.8*inch, 1*inch, 1*inch])
orders_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 9),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 0.5, colors.black)
]))
elements.append(orders_table)
elif report_type == "inventory":
result = await db.execute(select(Product).where(Product.is_active == True).order_by(Product.stock))
products = result.scalars().all()
elements.append(Paragraph("Inventory Status", styles['Heading2']))
inv_data = [["Product", "Category", "Price", "Stock", "Status"]]
for product in products:
status = "LOW STOCK" if product.stock <= product.low_stock_threshold else "In Stock"
inv_data.append([
product.name[:30] + "..." if len(product.name) > 30 else product.name,
product.category,
f"${product.price:.2f}",
str(product.stock),
status
])
inv_table = Table(inv_data, colWidths=[2*inch, 1*inch, 0.8*inch, 0.6*inch, 0.8*inch])
inv_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 8),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('GRID', (0, 0), (-1, -1), 0.5, colors.black)
]))
elements.append(inv_table)
doc.build(elements)
buffer.seek(0)
return StreamingResponse(
buffer,
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename={report_type}_report_{now.strftime('%Y%m%d')}.pdf"}
)
# ================== SEED DATA ==================
@api_router.post("/seed")
async def seed_data(db: AsyncSession = Depends(get_db)):
# Check if data exists
result = await db.execute(select(func.count(Product.id)))
if result.scalar() > 0:
return {"message": "Data already seeded"}
# Create admin user
admin = User(
email="admin@techzone.com",
name="Admin",
password=hash_password("admin123"),
role=UserRole.ADMIN
)
db.add(admin)
# Create products
products = [
Product(name="MacBook Pro 16\"", description="Powerful laptop with M3 Pro chip, 18GB RAM, 512GB SSD.", price=2499.99, category="laptops", image_url="https://images.unsplash.com/photo-1517336714731-489689fd1ca8?w=800", stock=15, brand="Apple", specs={"processor": "M3 Pro", "ram": "18GB", "storage": "512GB SSD"}),
Product(name="Dell XPS 15", description="Ultra-thin laptop with Intel Core i7, 16GB RAM, stunning OLED display.", price=1799.99, category="laptops", image_url="https://images.unsplash.com/photo-1593642632559-0c6d3fc62b89?w=800", stock=20, brand="Dell", specs={"processor": "Intel i7", "ram": "16GB", "storage": "512GB SSD"}),
Product(name="iPhone 15 Pro Max", description="Latest iPhone with titanium design, A17 Pro chip, 48MP camera.", price=1199.99, category="phones", image_url="https://images.unsplash.com/photo-1695048133142-1a20484d2569?w=800", stock=30, brand="Apple", specs={"display": "6.7\" OLED", "camera": "48MP", "storage": "256GB"}),
Product(name="Samsung Galaxy S24 Ultra", description="Premium Android phone with S Pen, 200MP camera, AI features.", price=1299.99, category="phones", image_url="https://images.unsplash.com/photo-1610945265064-0e34e5519bbf?w=800", stock=25, brand="Samsung", specs={"display": "6.8\" AMOLED", "camera": "200MP", "storage": "512GB"}),
Product(name="Sony WH-1000XM5", description="Industry-leading noise cancellation, 30-hour battery.", price=349.99, category="accessories", image_url="https://images.unsplash.com/photo-1505740420928-5e560c06d30e?w=800", stock=40, brand="Sony", specs={"type": "Over-ear", "battery": "30 hours"}),
Product(name="iPad Pro 12.9\"", description="Powerful tablet with M2 chip, Liquid Retina XDR display.", price=1099.99, category="tablets", image_url="https://images.unsplash.com/photo-1544244015-0df4b3ffc6b0?w=800", stock=18, brand="Apple", specs={"processor": "M2", "display": "12.9\" XDR"}),
Product(name="Apple Watch Ultra 2", description="Rugged smartwatch with titanium case, GPS, 36-hour battery.", price=799.99, category="wearables", image_url="https://images.unsplash.com/photo-1434493789847-2f02dc6ca35d?w=800", stock=22, brand="Apple", specs={"display": "49mm", "battery": "36 hours"}),
Product(name="Logitech MX Master 3S", description="Premium wireless mouse with 8K DPI sensor, silent clicks.", price=99.99, category="accessories", image_url="https://images.unsplash.com/photo-1527864550417-7fd91fc51a46?w=800", stock=50, brand="Logitech", specs={"sensor": "8K DPI", "battery": "70 days"}),
]
for p in products:
db.add(p)
# Create services
services = [
Service(name="Screen Repair", description="Professional screen replacement for phones, tablets, and laptops.", price=149.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1581092918056-0c4c3acd3789?w=800", category="repair"),
Service(name="Battery Replacement", description="Restore your device's battery life with genuine replacement.", price=79.99, duration="30-60 mins", image_url="https://images.unsplash.com/photo-1609091839311-d5365f9ff1c5?w=800", category="repair"),
Service(name="Data Recovery", description="Professional data recovery from damaged drives.", price=199.99, duration="2-5 days", image_url="https://images.unsplash.com/photo-1558494949-ef010cbdcc31?w=800", category="data"),
Service(name="Virus Removal", description="Complete malware and virus removal with system optimization.", price=89.99, duration="1-3 hours", image_url="https://images.unsplash.com/photo-1526374965328-7f61d4dc18c5?w=800", category="software"),
Service(name="Hardware Upgrade", description="Upgrade your RAM, SSD, or other components.", price=49.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1591799265444-d66432b91588?w=800", category="upgrade"),
Service(name="Device Setup", description="Complete setup service for new devices including data transfer.", price=59.99, duration="1-2 hours", image_url="https://images.unsplash.com/photo-1531297484001-80022131f5a1?w=800", category="setup"),
]
for s in services:
db.add(s)
await db.commit()
return {"message": "Data seeded successfully"}
# ================== ROOT ==================
@api_router.get("/")
async def root():
return {"message": "TechZone API is running", "version": "2.0.0"}
# Include the router
app.include_router(api_router)
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','),
allow_methods=["*"],
allow_headers=["*"],
)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@app.on_event("startup")
async def startup_event():
await init_db()
logger.info("Database initialized")
@app.on_event("shutdown")
async def shutdown_event():
pass