Initial commit - PromptTech

This commit is contained in:
2026-01-27 18:07:00 -06:00
commit 3959a223bf
262 changed files with 128736 additions and 0 deletions

6
backend/.env Normal file
View File

@@ -0,0 +1,6 @@
MONGO_URL="mongodb://localhost:27017"
DB_NAME="test_database"
CORS_ORIGINS="*"
JWT_SECRET="techzone-super-secret-key-2024-production"
DATABASE_URL="postgresql+asyncpg://techzone_user:techzone_pass@localhost:5432/techzone"
PORT=8181

View File

@@ -0,0 +1,43 @@
# VS Code Python Interpreter Configuration
The red import errors you're seeing are because VS Code needs to be configured to use the virtual environment.
## Quick Fix
1. **Open Command Palette** (Ctrl+Shift+P or Cmd+Shift+P)
2. Type: `Python: Select Interpreter`
3. Choose: `./venv/bin/python` or `Python 3.x.x ('venv': venv)`
## Alternative: Check .vscode/settings.json
Ensure this file exists in your project root with:
```json
{
"python.defaultInterpreterPath": "${workspaceFolder}/backend/venv/bin/python",
"python.terminal.activateEnvironment": true,
"python.analysis.extraPaths": [
"${workspaceFolder}/backend"
]
}
```
## Verify Installation
The files are actually working correctly. You can verify by running:
```bash
cd backend
source venv/bin/activate
python check_database_health.py # Works!
python test_upload.py # Works!
python optimize_database.py # Works!
```
## The 3 Files Showing Red (but working correctly)
1. **optimize_database.py** - Database optimization script
2. **check_database_health.py** - Database health monitoring
3. **test_upload.py** - Image upload testing utility
All dependencies are installed in `backend/venv/` - just need to tell VS Code to use it.

View File

@@ -0,0 +1,305 @@
"""
Database Health Check Script
Verifies schema correctness, relationship integrity, and performance
"""
import asyncio
from sqlalchemy import text, inspect
from database import async_engine
import logging
logging.basicConfig(level=logging.INFO, format='%(message)s')
logger = logging.getLogger(__name__)
async def check_schema():
"""Verify database schema matches models"""
async with async_engine.connect() as conn:
logger.info("="*70)
logger.info("DATABASE HEALTH CHECK")
logger.info("="*70)
# 1. Table Count
logger.info("\n📊 1. SCHEMA VERIFICATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public'
"""))
table_count = result.scalar()
expected_tables = 13 # Based on models.py
status = "" if table_count == expected_tables else "⚠️ "
logger.info(f" Tables: {table_count}/{expected_tables} {status}")
# 2. Foreign Keys
logger.info("\n🔗 2. RELATIONSHIPS & CONSTRAINTS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT COUNT(*)
FROM information_schema.table_constraints
WHERE constraint_type = 'FOREIGN KEY' AND table_schema = 'public'
"""))
fk_count = result.scalar()
logger.info(f" Foreign Keys: {fk_count}")
# 3. Indexes
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
"""))
index_count = result.scalar()
logger.info(f" Indexes (non-PK): {index_count}")
# 4. Check Constraints
result = await conn.execute(text("""
SELECT COUNT(*)
FROM pg_constraint
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
"""))
check_count = result.scalar()
logger.info(f" Check Constraints: {check_count}")
# 5. Verify all foreign keys have indexes
logger.info("\n⚡ 3. PERFORMANCE VALIDATION")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.conrelid::regclass AS table_name,
a.attname AS column_name
FROM pg_constraint c
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
AND a.attnum = ANY(i.indkey)
WHERE c.contype = 'f' AND i.indexrelid IS NULL
"""))
missing_indexes = result.fetchall()
if missing_indexes:
logger.info(f" ⚠️ {len(missing_indexes)} foreign keys without indexes:")
for table, column in missing_indexes:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All foreign keys are indexed")
# 6. Check for duplicate indexes
result = await conn.execute(text("""
SELECT
t.relname AS table_name,
i.relname AS index_name,
array_agg(a.attname ORDER BY a.attnum) AS columns
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
WHERE t.relkind = 'r'
AND t.relnamespace::regnamespace::text = 'public'
GROUP BY t.relname, i.relname
HAVING COUNT(*) > 0
"""))
indexes = result.fetchall()
# Group by columns to find duplicates
index_map = {}
for table, idx_name, columns in indexes:
key = (table, tuple(columns))
if key not in index_map:
index_map[key] = []
index_map[key].append(idx_name)
duplicates = [(k, v) for k, v in index_map.items() if len(v) > 1]
if duplicates:
logger.info(f" ⚠️ {len(duplicates)} duplicate index patterns found")
for (table, cols), names in duplicates[:3]:
logger.info(f" {table}{list(cols)}: {', '.join(names)}")
else:
logger.info(" ✅ No duplicate indexes found")
# 7. Table sizes and row counts
logger.info("\n📦 4. TABLE STATISTICS")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
c.relname AS table_name,
pg_size_pretty(pg_total_relation_size(c.oid)) AS size,
c.reltuples::bigint AS estimated_rows
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE relkind = 'r' AND nspname = 'public'
ORDER BY pg_total_relation_size(c.oid) DESC
LIMIT 10
"""))
sizes = result.fetchall()
logger.info(f" {'Table':<25} {'Size':>10} {'Rows':>10}")
logger.info(" " + "-" * 48)
for table, size, rows in sizes:
logger.info(f" {table:<25} {size:>10} {rows:>10}")
# 8. Check column data types alignment
logger.info("\n🔍 5. DATA TYPE VERIFICATION")
logger.info("-" * 70)
critical_columns = [
('users', 'email', 'character varying'),
('products', 'price', 'double precision'),
('services', 'price', 'double precision'),
('orders', 'total', 'double precision'),
('reviews', 'rating', 'integer'),
]
issues = []
for table, column, expected_type in critical_columns:
result = await conn.execute(text(f"""
SELECT data_type
FROM information_schema.columns
WHERE table_name = '{table}' AND column_name = '{column}'
"""))
actual_type = result.scalar()
if actual_type != expected_type:
issues.append(f"{table}.{column}: expected {expected_type}, got {actual_type}")
if issues:
logger.info(" ⚠️ Data type mismatches:")
for issue in issues:
logger.info(f" {issue}")
else:
logger.info(" ✅ All critical columns have correct types")
# 9. Check for missing NOT NULL constraints
logger.info("\n🛡️ 6. DATA INTEGRITY")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND column_name IN ('id', 'user_id', 'product_id', 'service_id', 'order_id')
AND is_nullable = 'YES'
ORDER BY table_name, column_name
"""))
nullable_fks = result.fetchall()
if nullable_fks:
logger.info(f" {len(nullable_fks)} nullable foreign/primary keys (by design):")
for table, column in nullable_fks[:5]:
logger.info(f" {table}.{column}")
else:
logger.info(" ✅ All IDs have appropriate nullability")
# 10. Enum types verification
logger.info("\n📋 7. ENUM TYPES")
logger.info("-" * 70)
result = await conn.execute(text("""
SELECT
t.typname AS enum_name,
array_agg(e.enumlabel ORDER BY e.enumsortorder) AS values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typnamespace::regnamespace::text = 'public'
GROUP BY t.typname
"""))
enums = result.fetchall()
if enums:
for enum_name, values in enums:
logger.info(f"{enum_name}: {len(values)} values")
else:
logger.info(" ⚠️ No enum types found")
# 11. Index usage statistics
logger.info("\n📈 8. INDEX USAGE")
logger.info("-" * 70)
try:
result = await conn.execute(text("""
SELECT
schemaname,
relname,
indexrelname,
idx_scan,
idx_tup_read
FROM pg_stat_user_indexes
WHERE schemaname = 'public'
ORDER BY idx_scan DESC
LIMIT 10
"""))
index_stats = result.fetchall()
logger.info(f" {'Table':<20} {'Index':<30} {'Scans':>10}")
logger.info(" " + "-" * 62)
for schema, table, idx_name, scans, reads in index_stats:
logger.info(f" {table:<20} {idx_name:<30} {scans:>10}")
except Exception as e:
logger.info(f" Index statistics not available (requires pg_stat_statements)")
# 12. Connection settings
logger.info("\n⚙️ 9. DATABASE CONFIGURATION")
logger.info("-" * 70)
settings_to_check = [
'max_connections',
'shared_buffers',
'effective_cache_size',
'work_mem',
'maintenance_work_mem'
]
for setting in settings_to_check:
try:
result = await conn.execute(text(f"SHOW {setting}"))
value = result.scalar()
logger.info(f" {setting:<25} {value}")
except:
pass
# Summary
logger.info("\n" + "="*70)
logger.info("HEALTH CHECK SUMMARY")
logger.info("="*70)
checks = [
(table_count == expected_tables, "Schema structure"),
(fk_count > 0, "Foreign key relationships"),
(index_count > 30, "Performance indexes"),
(check_count > 0, "Data validation constraints"),
(len(missing_indexes) == 0, "Foreign key indexing"),
(len(issues) == 0, "Data type correctness"),
]
passed = sum(1 for check, _ in checks if check)
total = len(checks)
logger.info(f"\n Passed: {passed}/{total} checks")
logger.info("")
for check, description in checks:
status = "" if check else "⚠️ "
logger.info(f" {status} {description}")
if passed == total:
logger.info("\n🎉 Database is healthy and optimized!")
elif passed >= total * 0.8:
logger.info("\n✅ Database is mostly healthy with minor issues")
else:
logger.info("\n⚠️ Database needs attention")
logger.info("\n" + "="*70)
async def main():
try:
await check_schema()
except Exception as e:
logger.error(f"\n❌ Health check failed: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())

50
backend/create_admin.py Normal file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""Script to create an admin user for the TechZone application"""
import asyncio
import sys
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import bcrypt
from datetime import datetime, timezone
# Import from your app
sys.path.append('/media/pts/Website/PromptTech_Solution_Site/backend')
from models import User, UserRole
from database import DATABASE_URL
async def create_admin():
# Create async engine
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
# Admin credentials
email = "admin@prompttech.com"
password = "admin123"
name = "Admin User"
# Hash password
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
# Create admin user
admin_user = User(
email=email,
name=name,
password=hashed_password,
role=UserRole.ADMIN,
created_at=datetime.now(timezone.utc)
)
session.add(admin_user)
await session.commit()
await session.refresh(admin_user)
print(f"\n✅ Admin user created successfully!")
print(f"Email: {email}")
print(f"Password: {password}")
print(f"Role: {admin_user.role.value}")
print(f"\n🔐 Please change the password after first login!\n")
if __name__ == "__main__":
asyncio.run(create_admin())

27
backend/database.py Normal file
View File

@@ -0,0 +1,27 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
import os
# PostgreSQL connection string
DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql+asyncpg://techzone_user:techzone_pass@localhost:5432/techzone')
SYNC_DATABASE_URL = DATABASE_URL.replace('+asyncpg', '')
# Async engine for FastAPI
async_engine = create_async_engine(DATABASE_URL, echo=False)
AsyncSessionLocal = async_sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)
# Sync engine for migrations and seeding
sync_engine = create_engine(SYNC_DATABASE_URL, echo=False)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def init_db():
from models import Base
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

303
backend/models.py Normal file
View File

@@ -0,0 +1,303 @@
from sqlalchemy import Column, Integer, String, Float, Boolean, DateTime, Text, ForeignKey, Enum as SQLEnum, JSON
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.sql import func
from datetime import datetime, timezone
import enum
import uuid
Base = declarative_base()
def generate_uuid():
return str(uuid.uuid4())
class OrderStatus(enum.Enum):
PENDING = "pending"
PROCESSING = "processing"
LAYAWAY = "layaway"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
REFUNDED = "refunded"
ON_HOLD = "on_hold"
class UserRole(enum.Enum):
USER = "user"
ADMIN = "admin"
EMPLOYEE = "employee"
ACCOUNTANT = "accountant"
SALES_MANAGER = "sales_manager"
class User(Base):
__tablename__ = "users"
id = Column(String(36), primary_key=True, default=generate_uuid)
email = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(255), nullable=False)
password = Column(String(255), nullable=False)
role = Column(SQLEnum(UserRole), default=UserRole.USER)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
cart_items = relationship("CartItem", back_populates="user", cascade="all, delete-orphan")
orders = relationship("Order", back_populates="user")
reviews = relationship("Review", back_populates="user")
bookings = relationship("Booking", back_populates="user")
class Category(Base):
__tablename__ = "categories"
id = Column(String(36), primary_key=True, default=generate_uuid)
name = Column(String(100), unique=True, nullable=False)
slug = Column(String(100), unique=True, nullable=False)
description = Column(Text)
type = Column(String(50), default="product") # product or service
created_at = Column(DateTime(timezone=True), server_default=func.now())
products = relationship("Product", back_populates="category_rel")
services = relationship("Service", back_populates="category_rel")
class Product(Base):
__tablename__ = "products"
id = Column(String(36), primary_key=True, default=generate_uuid)
name = Column(String(255), nullable=False)
description = Column(Text) # Now supports HTML from rich text editor
price = Column(Float, nullable=False)
category = Column(String(100), nullable=False)
category_id = Column(String(36), ForeignKey("categories.id"), nullable=True)
image_url = Column(String(500)) # Deprecated - kept for backwards compatibility
stock = Column(Integer, default=10)
low_stock_threshold = Column(Integer, default=5)
brand = Column(String(100))
specs = Column(JSON, default={})
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
category_rel = relationship("Category", back_populates="products")
cart_items = relationship("CartItem", back_populates="product")
order_items = relationship("OrderItem", back_populates="product")
reviews = relationship("Review", back_populates="product", cascade="all, delete-orphan")
inventory_logs = relationship("InventoryLog", back_populates="product", cascade="all, delete-orphan")
images = relationship("ProductImage", back_populates="product", cascade="all, delete-orphan", order_by="ProductImage.display_order")
class ProductImage(Base):
__tablename__ = "product_images"
id = Column(String(36), primary_key=True, default=generate_uuid)
product_id = Column(String(36), ForeignKey("products.id"), nullable=False)
image_url = Column(String(500), nullable=False)
display_order = Column(Integer, default=0)
is_primary = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
product = relationship("Product", back_populates="images")
class ServiceImage(Base):
__tablename__ = "service_images"
id = Column(String(36), primary_key=True, default=generate_uuid)
service_id = Column(String(36), ForeignKey("services.id"), nullable=False)
image_url = Column(String(500), nullable=False)
display_order = Column(Integer, default=0)
is_primary = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
service = relationship("Service", back_populates="images")
class Service(Base):
__tablename__ = "services"
id = Column(String(36), primary_key=True, default=generate_uuid)
name = Column(String(255), nullable=False)
description = Column(Text) # Now supports HTML from rich text editor
price = Column(Float, nullable=False)
duration = Column(String(50))
image_url = Column(String(500)) # Deprecated - kept for backwards compatibility
category = Column(String(100), nullable=False)
category_id = Column(String(36), ForeignKey("categories.id"), nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
category_rel = relationship("Category", back_populates="services")
bookings = relationship("Booking", back_populates="service")
reviews = relationship("Review", back_populates="service", cascade="all, delete-orphan")
images = relationship("ServiceImage", back_populates="service", cascade="all, delete-orphan", order_by="ServiceImage.display_order")
class CartItem(Base):
__tablename__ = "cart_items"
id = Column(String(36), primary_key=True, default=generate_uuid)
user_id = Column(String(36), ForeignKey("users.id"), nullable=False)
product_id = Column(String(36), ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, default=1)
created_at = Column(DateTime(timezone=True), server_default=func.now())
user = relationship("User", back_populates="cart_items")
product = relationship("Product", back_populates="cart_items")
class Order(Base):
__tablename__ = "orders"
id = Column(String(36), primary_key=True, default=generate_uuid)
user_id = Column(String(36), ForeignKey("users.id"), nullable=False)
status = Column(SQLEnum(OrderStatus), default=OrderStatus.PENDING)
subtotal = Column(Float, default=0)
tax = Column(Float, default=0)
shipping = Column(Float, default=0)
total = Column(Float, default=0)
shipping_address = Column(JSON, default={})
notes = Column(Text)
tracking_number = Column(String(100))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
user = relationship("User", back_populates="orders")
items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
status_history = relationship("OrderStatusHistory", back_populates="order", cascade="all, delete-orphan")
class OrderItem(Base):
__tablename__ = "order_items"
id = Column(String(36), primary_key=True, default=generate_uuid)
order_id = Column(String(36), ForeignKey("orders.id"), nullable=False)
product_id = Column(String(36), ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, default=1)
price = Column(Float, nullable=False)
product_name = Column(String(255))
product_image = Column(String(500))
order = relationship("Order", back_populates="items")
product = relationship("Product", back_populates="order_items")
class OrderStatusHistory(Base):
__tablename__ = "order_status_history"
id = Column(String(36), primary_key=True, default=generate_uuid)
order_id = Column(String(36), ForeignKey("orders.id"), nullable=False)
status = Column(SQLEnum(OrderStatus), nullable=False)
notes = Column(Text)
created_at = Column(DateTime(timezone=True), server_default=func.now())
created_by = Column(String(36))
order = relationship("Order", back_populates="status_history")
class Review(Base):
__tablename__ = "reviews"
id = Column(String(36), primary_key=True, default=generate_uuid)
user_id = Column(String(36), ForeignKey("users.id"), nullable=False)
product_id = Column(String(36), ForeignKey("products.id"), nullable=True)
service_id = Column(String(36), ForeignKey("services.id"), nullable=True)
rating = Column(Integer, nullable=False) # 1-5
title = Column(String(255))
comment = Column(Text)
is_verified_purchase = Column(Boolean, default=False)
is_approved = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
user = relationship("User", back_populates="reviews")
product = relationship("Product", back_populates="reviews")
service = relationship("Service", back_populates="reviews")
class Booking(Base):
__tablename__ = "bookings"
id = Column(String(36), primary_key=True, default=generate_uuid)
service_id = Column(String(36), ForeignKey("services.id"), nullable=False)
user_id = Column(String(36), ForeignKey("users.id"), nullable=True)
name = Column(String(255), nullable=False)
email = Column(String(255), nullable=False)
phone = Column(String(50))
preferred_date = Column(String(50))
notes = Column(Text)
status = Column(String(50), default="pending")
service_name = Column(String(255))
created_at = Column(DateTime(timezone=True), server_default=func.now())
service = relationship("Service", back_populates="bookings")
user = relationship("User", back_populates="bookings")
class Contact(Base):
__tablename__ = "contacts"
id = Column(String(36), primary_key=True, default=generate_uuid)
name = Column(String(255), nullable=False)
email = Column(String(255), nullable=False)
subject = Column(String(255))
message = Column(Text, nullable=False)
status = Column(String(50), default="pending")
created_at = Column(DateTime(timezone=True), server_default=func.now())
class InventoryLog(Base):
__tablename__ = "inventory_logs"
id = Column(String(36), primary_key=True, default=generate_uuid)
product_id = Column(String(36), ForeignKey("products.id"), nullable=False)
action = Column(String(50), nullable=False) # add, remove, adjust, sale
quantity_change = Column(Integer, nullable=False)
previous_stock = Column(Integer)
new_stock = Column(Integer)
notes = Column(Text)
created_by = Column(String(36))
created_at = Column(DateTime(timezone=True), server_default=func.now())
product = relationship("Product", back_populates="inventory_logs")
class SalesReport(Base):
__tablename__ = "sales_reports"
id = Column(String(36), primary_key=True, default=generate_uuid)
report_type = Column(String(50), nullable=False) # daily, weekly, monthly
report_date = Column(DateTime(timezone=True), nullable=False)
start_date = Column(DateTime(timezone=True))
end_date = Column(DateTime(timezone=True))
total_orders = Column(Integer, default=0)
total_revenue = Column(Float, default=0)
total_products_sold = Column(Integer, default=0)
total_services_booked = Column(Integer, default=0)
report_data = Column(JSON, default={})
created_at = Column(DateTime(timezone=True), server_default=func.now())
class AboutContent(Base):
__tablename__ = "about_content"
id = Column(String(36), primary_key=True, default=generate_uuid)
section = Column(String(50), nullable=False, unique=True) # 'hero', 'story', 'stats'
title = Column(String(255))
subtitle = Column(Text)
content = Column(Text) # HTML content from rich text editor
image_url = Column(String(500))
data = Column(JSON, default={}) # For flexible content like stats
is_active = Column(Boolean, default=True)
display_order = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class TeamMember(Base):
__tablename__ = "team_members"
id = Column(String(36), primary_key=True, default=generate_uuid)
name = Column(String(255), nullable=False)
role = Column(String(255), nullable=False)
bio = Column(Text) # HTML content from rich text editor
image_url = Column(String(500))
email = Column(String(255))
linkedin = Column(String(500))
display_order = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class CompanyValue(Base):
__tablename__ = "company_values"
id = Column(String(36), primary_key=True, default=generate_uuid)
title = Column(String(255), nullable=False)
description = Column(Text)
icon = Column(String(50)) # Icon name (e.g., 'Target', 'Users', 'Award', 'Heart')
display_order = Column(Integer, default=0)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

View File

@@ -0,0 +1,326 @@
"""
Database Optimization Script
Creates missing indexes and constraints for PostgreSQL
"""
import asyncio
from sqlalchemy import text
from database import async_engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def optimize_database():
"""Apply database optimizations including indexes and constraints"""
async with async_engine.connect() as conn:
logger.info("Starting database optimization...")
# ============= FOREIGN KEY INDEXES =============
logger.info("\n1. Creating indexes on foreign keys...")
fk_indexes = [
("idx_products_category_id", "products", "category_id"),
("idx_services_category_id", "services", "category_id"),
("idx_orders_user_id", "orders", "user_id"),
("idx_cart_items_user_id", "cart_items", "user_id"),
("idx_cart_items_product_id", "cart_items", "product_id"),
("idx_order_items_order_id", "order_items", "order_id"),
("idx_order_items_product_id", "order_items", "product_id"),
("idx_order_status_history_order_id", "order_status_history", "order_id"),
("idx_reviews_user_id", "reviews", "user_id"),
("idx_reviews_product_id", "reviews", "product_id"),
("idx_reviews_service_id", "reviews", "service_id"),
("idx_bookings_user_id", "bookings", "user_id"),
("idx_bookings_service_id", "bookings", "service_id"),
("idx_inventory_logs_product_id", "inventory_logs", "product_id"),
]
for idx_name, table, column in fk_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}.{column}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= PERFORMANCE INDEXES =============
logger.info("\n2. Creating performance indexes...")
performance_indexes = [
# Products - frequently filtered columns
("idx_products_is_active", "products", "is_active"),
("idx_products_category", "products", "category"),
("idx_products_stock", "products", "stock"),
("idx_products_created_at", "products", "created_at DESC"),
# Services - frequently filtered columns
("idx_services_is_active", "services", "is_active"),
("idx_services_category", "services", "category"),
("idx_services_created_at", "services", "created_at DESC"),
# Orders - status and date filtering
("idx_orders_status", "orders", "status"),
("idx_orders_created_at", "orders", "created_at DESC"),
("idx_orders_updated_at", "orders", "updated_at DESC"),
# Reviews - approval status
("idx_reviews_is_approved", "reviews", "is_approved"),
("idx_reviews_created_at", "reviews", "created_at DESC"),
# Bookings - status filtering
("idx_bookings_status", "bookings", "status"),
("idx_bookings_created_at", "bookings", "created_at DESC"),
# Inventory logs - date filtering
("idx_inventory_logs_created_at", "inventory_logs", "created_at DESC"),
# Cart items - user lookup
("idx_cart_items_created_at", "cart_items", "created_at DESC"),
]
for idx_name, table, column in performance_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= COMPOSITE INDEXES =============
logger.info("\n3. Creating composite indexes...")
composite_indexes = [
# Products: category + active status (common query pattern)
("idx_products_category_active", "products", ["category", "is_active"]),
# Services: category + active status
("idx_services_category_active", "services", ["category", "is_active"]),
# Orders: user + status (for user order history)
("idx_orders_user_status", "orders", ["user_id", "status"]),
# Reviews: product + approved (for product reviews)
("idx_reviews_product_approved", "reviews", ["product_id", "is_approved"]),
# Reviews: service + approved (for service reviews)
("idx_reviews_service_approved", "reviews", ["service_id", "is_approved"]),
# Inventory logs: product + created_at (for product history)
("idx_inventory_logs_product_date", "inventory_logs", ["product_id", "created_at DESC"]),
]
for idx_name, table, columns in composite_indexes:
try:
cols_str = ", ".join(columns)
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({cols_str})
"""))
await conn.commit()
logger.info(f" ✓ Created {idx_name} on {table}({cols_str})")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= PARTIAL INDEXES =============
logger.info("\n4. Creating partial indexes...")
partial_indexes = [
# Only index active products
("idx_products_active_only", "products", "category", "is_active = true"),
# Only index active services
("idx_services_active_only", "services", "category", "is_active = true"),
# Only index approved reviews
("idx_reviews_approved_only", "reviews", "product_id", "is_approved = true"),
# Only index low stock products
("idx_products_low_stock", "products", "id",
"stock <= low_stock_threshold AND is_active = true"),
]
for idx_name, table, column, condition in partial_indexes:
try:
await conn.execute(text(f"""
CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column})
WHERE {condition}
"""))
await conn.commit()
logger.info(f" ✓ Created partial index {idx_name}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to create {idx_name}: {str(e)[:100]}")
# ============= OPTIMIZE ENUMS =============
logger.info("\n5. Ensuring enum types exist...")
try:
# Check if enum types exist, create if missing
result = await conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM pg_type WHERE typname = 'orderstatus'
)
"""))
if not result.scalar():
logger.info(" Creating OrderStatus enum type...")
await conn.execute(text("""
CREATE TYPE orderstatus AS ENUM (
'pending', 'processing', 'layaway', 'shipped',
'delivered', 'cancelled', 'refunded', 'on_hold'
)
"""))
result = await conn.execute(text("""
SELECT EXISTS (
SELECT 1 FROM pg_type WHERE typname = 'userrole'
)
"""))
if not result.scalar():
logger.info(" Creating UserRole enum type...")
await conn.execute(text("""
CREATE TYPE userrole AS ENUM ('user', 'admin')
"""))
logger.info(" ✓ Enum types verified")
except Exception as e:
logger.warning(f" ⚠ Enum type check: {str(e)}")
# ============= ADD CONSTRAINTS =============
logger.info("\n6. Adding check constraints...")
constraints = [
("chk_products_price_positive", "products", "price > 0"),
("chk_products_stock_nonnegative", "products", "stock >= 0"),
("chk_services_price_positive", "services", "price > 0"),
("chk_orders_totals_nonnegative", "orders",
"subtotal >= 0 AND tax >= 0 AND shipping >= 0 AND total >= 0"),
("chk_reviews_rating_range", "reviews", "rating >= 1 AND rating <= 5"),
("chk_order_items_quantity_positive", "order_items", "quantity > 0"),
("chk_order_items_price_nonnegative", "order_items", "price >= 0"),
]
for constraint_name, table, condition in constraints:
try:
# Check if constraint exists
result = await conn.execute(text(f"""
SELECT COUNT(*) FROM pg_constraint
WHERE conname = '{constraint_name}'
"""))
if result.scalar() == 0:
await conn.execute(text(f"""
ALTER TABLE {table}
ADD CONSTRAINT {constraint_name} CHECK ({condition})
"""))
await conn.commit()
logger.info(f" ✓ Added constraint {constraint_name}")
else:
logger.info(f" ⊙ Constraint {constraint_name} already exists")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to add {constraint_name}: {str(e)[:100]}")
# ============= ANALYZE TABLES =============
logger.info("\n7. Analyzing tables for statistics...")
tables = ['users', 'products', 'services', 'orders', 'order_items',
'reviews', 'bookings', 'cart_items', 'inventory_logs']
for table in tables:
try:
await conn.execute(text(f"ANALYZE {table}"))
await conn.commit()
logger.info(f" ✓ Analyzed {table}")
except Exception as e:
await conn.rollback()
logger.error(f" ✗ Failed to analyze {table}: {str(e)[:100]}")
logger.info("\n✅ Database optimization complete!")
async def verify_optimization():
"""Verify that optimizations were applied correctly"""
async with async_engine.connect() as conn:
logger.info("\n" + "="*60)
logger.info("OPTIMIZATION VERIFICATION")
logger.info("="*60)
# Count indexes
result = await conn.execute(text("""
SELECT COUNT(*) FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
"""))
index_count = result.scalar()
logger.info(f"\nTotal indexes created: {index_count}")
# Count constraints
result = await conn.execute(text("""
SELECT COUNT(*) FROM pg_constraint
WHERE contype = 'c' AND connamespace::regnamespace::text = 'public'
"""))
constraint_count = result.scalar()
logger.info(f"Total check constraints: {constraint_count}")
# List all indexes by table
result = await conn.execute(text("""
SELECT tablename, COUNT(*) as idx_count
FROM pg_indexes
WHERE schemaname = 'public' AND indexname NOT LIKE '%_pkey'
GROUP BY tablename
ORDER BY idx_count DESC
"""))
indexes_by_table = result.fetchall()
logger.info("\nIndexes per table:")
for table, count in indexes_by_table:
logger.info(f" {table:<25} {count} indexes")
# Check for any remaining unindexed foreign keys
result = await conn.execute(text("""
SELECT
c.conrelid::regclass AS table_name,
a.attname AS column_name
FROM pg_constraint c
JOIN pg_attribute a ON a.attnum = ANY(c.conkey) AND a.attrelid = c.conrelid
LEFT JOIN pg_index i ON i.indrelid = c.conrelid
AND a.attnum = ANY(i.indkey)
WHERE c.contype = 'f'
AND i.indexrelid IS NULL
"""))
unindexed_fks = result.fetchall()
if unindexed_fks:
logger.warning(f"\n⚠️ {len(unindexed_fks)} foreign keys still without indexes:")
for table, column in unindexed_fks:
logger.warning(f" {table}.{column}")
else:
logger.info("\n✓ All foreign keys are properly indexed!")
logger.info("\n" + "="*60)
async def main():
"""Main optimization workflow"""
try:
await optimize_database()
await verify_optimization()
logger.info("\n🎉 Database optimization successful!")
logger.info("\nRecommendations:")
logger.info(" 1. Monitor query performance with pg_stat_statements")
logger.info(" 2. Run VACUUM ANALYZE periodically")
logger.info(" 3. Consider partitioning large tables (orders, inventory_logs)")
logger.info(" 4. Set up connection pooling for production")
except Exception as e:
logger.error(f"\n❌ Optimization failed: {str(e)}")
raise
if __name__ == "__main__":
asyncio.run(main())

131
backend/requirements.txt Normal file
View File

@@ -0,0 +1,131 @@
aiofiles==25.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.12.0
asyncpg==0.31.0
attrs==25.4.0
bcrypt==4.1.3
black==25.12.0
boto3==1.42.21
botocore==1.42.21
certifi==2026.1.4
cffi==2.0.0
charset-normalizer==3.4.4
click==8.3.1
cryptography==46.0.3
distro==1.9.0
dnspython==2.8.0
ecdsa==0.19.1
email-validator==2.3.0
fastapi==0.110.1
fastuuid==0.14.0
filelock==3.20.2
flake8==7.3.0
frozenlist==1.8.0
fsspec==2025.12.0
google-ai-generativelanguage==0.6.15
google-api-core==2.29.0
google-api-python-client==2.187.0
google-auth==2.47.0
google-auth-httplib2==0.3.0
google-genai==1.57.0
google-generativeai==0.8.6
googleapis-common-protos==1.72.0
greenlet==3.3.0
grpcio==1.76.0
grpcio-status==1.71.2
h11==0.16.0
hf-xet==1.2.0
httpcore==1.0.9
httplib2==0.31.0
httpx==0.28.1
huggingface_hub==1.2.4
idna==3.11
importlib_metadata==8.7.1
iniconfig==2.3.0
isort==7.0.0
Jinja2==3.1.6
jiter==0.12.0
jmespath==1.0.1
jq==1.10.0
jsonschema==4.26.0
jsonschema-specifications==2025.9.1
librt==0.7.7
litellm==1.80.0
markdown-it-py==4.0.0
MarkupSafe==3.0.3
mccabe==0.7.0
mdurl==0.1.2
motor==3.3.1
multidict==6.7.0
mypy==1.19.1
mypy_extensions==1.1.0
numpy==2.4.0
oauthlib==3.3.1
openai==1.99.9
packaging==25.0
pandas==2.3.3
passlib==1.7.4
pathspec==0.12.1
pillow==12.1.0
platformdirs==4.5.1
pluggy==1.6.0
propcache==0.4.1
proto-plus==1.27.0
protobuf==5.29.5
psycopg2-binary==2.9.11
pyasn1==0.6.1
pyasn1_modules==0.4.2
pycodestyle==2.14.0
pycparser==2.23
pydantic==2.12.5
pydantic_core==2.41.5
pyflakes==3.4.0
Pygments==2.19.2
PyJWT==2.10.1
pymongo==4.5.0
pyparsing==3.3.1
pytest==9.0.2
python-dateutil==2.9.0.post0
python-dotenv==1.2.1
python-jose==3.5.0
python-multipart==0.0.21
pytokens==0.3.0
pytz==2025.2
PyYAML==6.0.3
referencing==0.37.0
regex==2025.11.3
reportlab==4.4.7
requests==2.32.5
requests-oauthlib==2.0.0
rich==14.2.0
rpds-py==0.30.0
rsa==4.9.1
s3transfer==0.16.0
s5cmd==0.2.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.45
starlette==0.37.2
stripe==14.1.0
tenacity==9.1.2
tiktoken==0.12.0
tokenizers==0.22.2
tqdm==4.67.1
typer==0.21.0
typer-slim==0.21.1
typing-inspection==0.4.2
typing_extensions==4.15.0
tzdata==2025.3
uritemplate==4.2.0
urllib3==2.6.2
uvicorn==0.25.0
watchfiles==1.1.1
websockets==15.0.1
yarl==1.22.0
zipp==3.23.0
Pillow>=10.0.0
pillow-heif>=0.13.0

153
backend/seed_about_page.py Normal file
View File

@@ -0,0 +1,153 @@
"""
Seed About Page Data
Populates the about_content, team_members, and company_values tables
with existing data from the frontend About.js page
"""
import asyncio
from database import async_engine, AsyncSessionLocal
from models import AboutContent, TeamMember, CompanyValue
from sqlalchemy import select
async def seed_about_data():
async with AsyncSessionLocal() as db:
# Check if data already exists
result = await db.execute(select(TeamMember))
if result.scalars().first():
print("❌ About page data already exists. Skipping seed.")
return
print("🌱 Seeding About page data...")
# 1. Hero Section Content
hero = AboutContent(
section="hero",
title="Your Trusted Tech Partner",
subtitle="About PromptTech Solutions",
content="<p>Founded in 2020, PromptTech Solutions has grown from a small repair shop to a comprehensive tech solutions provider. We combine quality products with expert services to deliver the best tech experience.</p>",
display_order=0,
is_active=True
)
db.add(hero)
# 2. Our Story Content
story = AboutContent(
section="story",
title="Our Story",
subtitle="",
content="""<p>PromptTech Solutions started with a simple vision: to make quality tech accessible and provide expert support that customers can trust. What began as a small phone repair shop has evolved into a full-service tech destination.</p>
<p>Our team of certified technicians brings decades of combined experience in electronics repair, from smartphones to laptops and everything in between. We've helped thousands of customers bring their devices back to life.</p>
<p>Today, we're proud to offer a curated selection of premium electronics alongside our repair services. Every product we sell meets our high standards for quality, and every repair we do is backed by our satisfaction guarantee.</p>""",
display_order=0,
is_active=True
)
db.add(story)
# 3. Stats Section
stats = AboutContent(
section="stats",
title="",
subtitle="",
content="",
data={
"stats": [
{"label": "Happy Customers", "value": "50,000+"},
{"label": "Products Sold", "value": "10,000+"},
{"label": "Repairs Completed", "value": "25,000+"},
{"label": "Years Experience", "value": "5+"}
]
},
display_order=0,
is_active=True
)
db.add(stats)
# 4. Team Members
team_members = [
TeamMember(
name="Alex Johnson",
role="Founder & CEO",
bio="<p>Alex founded PromptTech Solutions with a vision to make quality tech accessible to everyone.</p>",
image_url="https://images.unsplash.com/photo-1507003211169-0a1dd7228f2d?w=400",
email="",
linkedin="",
display_order=0,
is_active=True
),
TeamMember(
name="Sarah Williams",
role="Head of Operations",
bio="<p>Sarah ensures smooth operations and exceptional customer service across all our locations.</p>",
image_url="https://images.unsplash.com/photo-1494790108377-be9c29b29330?w=400",
email="",
linkedin="",
display_order=1,
is_active=True
),
TeamMember(
name="Mike Chen",
role="Lead Technician",
bio="<p>Mike leads our team of certified technicians with over 15 years of electronics repair experience.</p>",
image_url="https://images.unsplash.com/photo-1472099645785-5658abf4ff4e?w=400",
email="",
linkedin="",
display_order=2,
is_active=True
),
TeamMember(
name="Emily Davis",
role="Customer Success",
bio="<p>Emily is dedicated to ensuring every customer has an outstanding experience with PromptTech Solutions.</p>",
image_url="https://images.unsplash.com/photo-1438761681033-6461ffad8d80?w=400",
email="",
linkedin="",
display_order=3,
is_active=True
),
]
for member in team_members:
db.add(member)
# 5. Company Values
values = [
CompanyValue(
title="Quality First",
description="We never compromise on the quality of our products and services.",
icon="🎯",
display_order=0,
is_active=True
),
CompanyValue(
title="Customer Focus",
description="Your satisfaction is our top priority. We listen and deliver.",
icon="👥",
display_order=1,
is_active=True
),
CompanyValue(
title="Excellence",
description="We strive for excellence in everything we do.",
icon="🏆",
display_order=2,
is_active=True
),
CompanyValue(
title="Integrity",
description="Honest, transparent, and ethical business practices.",
icon="❤️",
display_order=3,
is_active=True
),
]
for value in values:
db.add(value)
await db.commit()
print("✅ About page data seeded successfully!")
print(f" - 3 content sections created")
print(f" - 4 team members created")
print(f" - 4 company values created")
if __name__ == "__main__":
asyncio.run(seed_about_data())

View File

@@ -0,0 +1,40 @@
import asyncio
from database import AsyncSessionLocal
from models import Category
def create_slug(name: str) -> str:
"""Convert name to slug"""
return name.lower().replace(" ", "-").replace("&", "and")
async def seed_categories():
"""Seed initial categories"""
default_categories = [
{"name": "Phones", "slug": "phones", "description": "Smartphones and mobile devices"},
{"name": "Laptops", "slug": "laptops", "description": "Portable computers and notebooks"},
{"name": "Tablets", "slug": "tablets", "description": "Tablet devices and e-readers"},
{"name": "Wearables", "slug": "wearables", "description": "Smartwatches and fitness trackers"},
{"name": "Accessories", "slug": "accessories", "description": "Tech accessories and peripherals"},
{"name": "Gaming", "slug": "gaming", "description": "Gaming consoles and accessories"},
{"name": "Audio", "slug": "audio", "description": "Headphones, speakers, and audio equipment"},
]
async with AsyncSessionLocal() as db:
# Check if categories already exist
from sqlalchemy import select
result = await db.execute(select(Category))
existing = result.scalars().all()
if existing:
print(f"✓ Categories already exist ({len(existing)} found)")
return
# Add categories
for cat_data in default_categories:
category = Category(**cat_data)
db.add(category)
await db.commit()
print(f"✓ Seeded {len(default_categories)} categories successfully")
if __name__ == "__main__":
asyncio.run(seed_categories())

54954
backend/server.log Normal file

File diff suppressed because it is too large Load Diff

2759
backend/server.py Normal file

File diff suppressed because it is too large Load Diff

BIN
backend/techzone.db Normal file

Binary file not shown.

43
backend/test_image_upload.sh Executable file
View File

@@ -0,0 +1,43 @@
#!/bin/bash
# Test image upload endpoint
# First, login as admin to get token
echo "=== Testing Image Upload ==="
# Login as admin
TOKEN=$(curl -s -X POST http://localhost:8181/api/auth/login \
-H "Content-Type: application/json" \
-d '{"email":"admin@techzone.com","password":"admin123"}' | python3 -c "import sys, json; print(json.load(sys.stdin)['access_token'])" 2>/dev/null)
if [ -z "$TOKEN" ]; then
echo "❌ Failed to get admin token"
exit 1
fi
echo "✓ Got admin token"
# Create a test image file
echo "Creating test image..."
convert -size 100x100 xc:blue /tmp/test_product_image.jpg 2>/dev/null || {
# Fallback if imagemagick not available
echo "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==" | base64 -d > /tmp/test_product_image.png
}
# Try to upload the image
echo "Uploading image..."
RESPONSE=$(curl -s -X POST http://localhost:8181/api/upload/image \
-H "Authorization: Bearer $TOKEN" \
-F "file=@/tmp/test_product_image.jpg" 2>&1)
echo "Response: $RESPONSE"
if echo "$RESPONSE" | grep -q "url"; then
echo "✓ Image upload successful!"
else
echo "❌ Image upload failed"
echo "Full response: $RESPONSE"
fi
# Cleanup
rm -f /tmp/test_product_image.jpg /tmp/test_product_image.png

66
backend/test_upload.py Normal file
View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""Test script to verify image upload functionality"""
import asyncio
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from database import AsyncSessionLocal
from models import User
from sqlalchemy import select
import bcrypt
import jwt
from datetime import datetime, timezone, timedelta
SECRET_KEY = 'techzone-super-secret-key-2024-production'
ALGORITHM = "HS256"
async def test_upload():
"""Test that we can generate a valid admin token"""
async with AsyncSessionLocal() as db:
# Find admin user
result = await db.execute(
select(User).where(User.email == "admin@techzone.com")
)
admin = result.scalar_one_or_none()
if not admin:
print("❌ Admin user not found!")
print("Creating admin user...")
hashed_password = bcrypt.hashpw("admin123".encode(), bcrypt.gensalt())
admin = User(
email="admin@techzone.com",
name="Admin",
password_hash=hashed_password.decode(),
role="admin"
)
db.add(admin)
await db.commit()
await db.refresh(admin)
print("✅ Admin user created")
# Generate token
expires = datetime.now(timezone.utc) + timedelta(hours=24)
token_data = {
"sub": admin.id,
"email": admin.email,
"exp": expires
}
token = jwt.encode(token_data, SECRET_KEY, algorithm=ALGORITHM)
print("\n" + "="*60)
print("ADMIN TOKEN FOR TESTING:")
print("="*60)
print(token)
print("="*60)
print("\nYou can use this token to test image upload:")
print(f'\ncurl -X POST http://localhost:8181/api/upload/image \\')
print(f' -H "Authorization: Bearer {token}" \\')
print(f' -F "file=@/path/to/your/image.jpg"')
print("\n")
if __name__ == "__main__":
asyncio.run(test_upload())

View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Comprehensive test for image upload functionality
Tests various scenarios to ensure robust upload handling
"""
import requests
import sys
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent))
from test_upload import SECRET_KEY, ALGORITHM
import jwt
from datetime import datetime, timezone, timedelta
# Generate admin token
def get_admin_token():
expires = datetime.now(timezone.utc) + timedelta(hours=24)
token_data = {
"sub": "0739d174-9409-4bd1-b749-69f9e5546467f", # Admin user ID
"email": "admin@techzone.com",
"exp": expires
}
return jwt.encode(token_data, SECRET_KEY, algorithm=ALGORITHM)
def test_upload(filename, content, content_type=None):
"""Test uploading a file"""
token = get_admin_token()
files = {'file': (filename, content, content_type)}
headers = {'Authorization': f'Bearer {token}'}
print(f"\n{'='*60}")
print(f"Testing upload: {filename}")
print(f"Content-Type: {content_type}")
print(f"Size: {len(content)} bytes")
print(f"{'='*60}")
try:
response = requests.post(
'http://localhost:8181/api/upload/image',
files=files,
headers=headers
)
if response.status_code == 200:
print(f"✅ SUCCESS: {response.json()}")
return True
else:
print(f"❌ FAILED ({response.status_code}): {response.json()}")
return False
except Exception as e:
print(f"❌ ERROR: {e}")
return False
if __name__ == "__main__":
# Test 1: Real JPEG with proper content type
with open('/tmp/test-image.jpg', 'rb') as f:
content = f.read()
test_upload('test.jpg', content, 'image/jpeg')
# Test 2: JPEG with no content type (browser might do this)
test_upload('test2.jpg', content, None)
# Test 3: PNG extension
test_upload('test.png', content, 'image/png')
# Test 4: File with uppercase extension
test_upload('TEST.JPG', content, 'image/jpeg')
# Test 5: No extension but has content type
test_upload('image', content, 'image/jpeg')
# Test 6: Invalid extension
test_upload('file.txt', b'not an image', 'text/plain')
print(f"\n{'='*60}")
print("Test complete!")
print(f"{'='*60}\n")

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 MiB

View File

@@ -0,0 +1 @@
Test file content

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.6 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB