Files

1676 lines
64 KiB
Python

from flask import Flask, jsonify, request, make_response, session
from flask_cors import CORS
from flask_caching import Cache
from flask_compress import Compress
from datetime import datetime, timedelta
from postgresql_models import init_db, SessionLocal, Profile, Song, Plan, PlanSong, ProfileSong, ProfileSongKey, BiometricCredential, User
from functools import lru_cache
import hashlib
import hmac
import base64
import uuid
try:
from dotenv import load_dotenv
load_dotenv() # Loads variables from a .env file if present
except ImportError:
# dotenv not installed yet; proceed without loading .env
pass
import os
import json
import secrets
import logging
import re
# Import bleach if available, but not required for basic operation
try:
import bleach
BLEACH_AVAILABLE = True
except ImportError:
BLEACH_AVAILABLE = False
logger.warning('bleach not installed - using basic XSS protection')
from rate_limiter import rate_limit, get_rate_limit_headers
from validators import validate_request_data, PROFILE_SCHEMA, SONG_SCHEMA, PLAN_SCHEMA, sanitize_filename
from biometric_auth import (
register_biometric_credential, authenticate_biometric_by_device,
get_user_biometric_status, delete_all_user_biometrics
)
from helpers import (
success_response, error_response, not_found_response, validation_error,
sanitize_text, validate_id, serialize_profile, serialize_song, serialize_plan,
extract_profile_data, extract_song_data, extract_plan_data, search_songs,
update_model_fields, safe_db_operation, NotFoundError, get_or_404
)
# Configure logging
log_dir = os.path.join(os.path.dirname(__file__), 'logs')
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(log_dir, 'app.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Generate strong secret key for sessions
app.secret_key = os.environ.get('SECRET_KEY') or secrets.token_hex(32)
# API Key Authentication (for backend-to-backend calls)
API_KEY = os.environ.get('API_KEY') or secrets.token_hex(32)
if os.environ.get('API_KEY'):
logger.info('API_KEY loaded from environment')
else:
logger.warning('API_KEY generated at runtime - set in .env for production')
# CSRF Token storage (in-memory for simplicity, use Redis in production)
csrf_tokens = {}
# Performance Optimization - In-Memory Caching with TTL
cache = Cache(app, config={
'CACHE_TYPE': 'SimpleCache',
'CACHE_DEFAULT_TIMEOUT': 300, # 5 minutes default
'CACHE_THRESHOLD': 500 # Max cached items
})
# Performance Optimization - Gzip Compression
Compress(app)
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/xml', 'application/json', 'application/javascript']
app.config['COMPRESS_LEVEL'] = 6 # Balance between speed and compression
app.config['COMPRESS_MIN_SIZE'] = 500 # Only compress responses > 500 bytes
# Session Security Settings (applies to all environments)
app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS only in production
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict' # CSRF protection
app.config['PERMANENT_SESSION_LIFETIME'] = 3600 # 1 hour
app.config['SESSION_COOKIE_NAME'] = '__Host-session' if not app.debug else 'session' # Secure prefix
# Request size limits
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max request size
# Security Headers Middleware
@app.after_request
def set_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# Add rate limit headers
client_id = request.remote_addr or 'unknown'
rate_headers = get_rate_limit_headers(client_id)
for key, value in rate_headers.items():
response.headers[key] = value
# Remove server header to prevent information disclosure
response.headers.pop('Server', None)
# Add CSP header
response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'"
# Add caching headers based on endpoint type
if request.path.startswith('/static/') or request.path.endswith(('.js', '.css', '.png', '.jpg', '.svg')):
response.headers['Cache-Control'] = 'public, max-age=31536000, immutable' # 1 year for static assets
elif request.path in ['/api/health', '/api/providers']:
response.headers['Cache-Control'] = 'public, max-age=60' # 1 minute for health/status
elif request.method == 'GET' and request.path.startswith('/api/'):
response.headers['Cache-Control'] = 'private, max-age=30, must-revalidate' # 30 seconds for API reads
elif request.method in ['POST', 'PUT', 'DELETE']:
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate' # No cache for mutations
# Add ETag for GET requests to enable conditional requests
if request.method == 'GET' and response.status_code == 200:
if not response.headers.get('ETag'):
import hashlib
etag = hashlib.md5(response.get_data()).hexdigest()
response.headers['ETag'] = f'"{etag}"'
# Check If-None-Match header
if request.headers.get('If-None-Match') == response.headers['ETag']:
response.status_code = 304
response.set_data(b'')
return response
# CORS configuration - restricted to known origins
ALLOWED_ORIGINS = [
"http://localhost:5100",
"https://houseofprayer.ddns.net"
]
# Add development IP if in dev mode
if os.environ.get('FLASK_ENV') != 'production':
ALLOWED_ORIGINS.extend([
"http://192.168.10.130:5100",
"http://localhost:8080"
])
CORS(app, resources={
r"/api/*": {
"origins": ALLOWED_ORIGINS,
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"],
"supports_credentials": True,
"max_age": 3600
}
})
init_db()
FLASK_PORT = int(os.environ.get('FLASK_PORT', '8080'))
# Authentication and Authorization Functions
def generate_csrf_token():
"""Generate a CSRF token for the session"""
if 'csrf_token' not in session:
session['csrf_token'] = secrets.token_hex(32)
return session['csrf_token']
def verify_csrf_token():
"""Verify CSRF token from request"""
token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')
if not token or not session.get('csrf_token'):
return False
return hmac.compare_digest(token, session['csrf_token'])
def require_api_key(f):
"""Decorator to require API key for administrative endpoints"""
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key or not hmac.compare_digest(api_key, API_KEY):
logger.warning(f'Unauthorized API access attempt from {request.remote_addr}')
return jsonify({'error': 'unauthorized', 'message': 'Invalid or missing API key'}), 401
return f(*args, **kwargs)
return decorated_function
def require_csrf(f):
"""Decorator to require CSRF token for state-changing operations"""
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
if not verify_csrf_token():
logger.warning(f'CSRF validation failed from {request.remote_addr}')
return jsonify({'error': 'csrf_validation_failed', 'message': 'Invalid or missing CSRF token'}), 403
return f(*args, **kwargs)
return decorated_function
def require_auth(f):
"""Decorator to require authentication"""
from functools import wraps
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session:
logger.warning(f'Unauthorized access attempt from {request.remote_addr}')
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
return f(*args, **kwargs)
return decorated_function
def require_permission(permission):
"""Decorator to require specific permission"""
from functools import wraps
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
username = session.get('username')
if not username:
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
db = get_db()
try:
user = db.query(User).filter(User.username == username).first()
if not user or not user.has_permission(permission):
logger.warning(f'Permission denied for {username} on {request.endpoint}')
return jsonify({'error': 'forbidden', 'message': f'Permission "{permission}" required'}), 403
finally:
pass
return f(*args, **kwargs)
return decorated_function
return decorator
# CSRF Token endpoint
@app.route('/api/csrf-token', methods=['GET'])
def get_csrf_token():
"""Get CSRF token for the session"""
token = generate_csrf_token()
return jsonify({'csrf_token': token})
# Validate critical environment variables in production
if not app.debug:
required_vars = ['POSTGRESQL_URI', 'SECRET_KEY']
missing = [var for var in required_vars if not os.environ.get(var)]
if missing:
print(f"WARNING: Missing critical environment variables: {', '.join(missing)}")
print("Please configure these in your .env file for production use.")
def get_db():
"""Create a new database session with automatic cleanup
IMPORTANT: Do NOT call db.close() in finally blocks when using scoped_session.
The teardown_appcontext handler automatically cleans up sessions.
Calling close() manually causes connection pool exhaustion.
"""
return SessionLocal()
@app.teardown_appcontext
def cleanup_session(exception=None):
"""Cleanup scoped database sessions after each request to prevent connection pool exhaustion
This is automatically called by Flask after every request, ensuring proper
session cleanup without manual intervention in route handlers.
"""
try:
SessionLocal.remove()
except Exception as e:
logger.error(f"Error during session cleanup: {e}")
# Force cleanup on error
try:
engine.dispose()
except:
pass
def extract_text_from_file(file_storage):
"""Extract text from an uploaded file (docx, pdf, image, txt)."""
filename = file_storage.filename or ''
lower = filename.lower()
data = file_storage.read()
# Reset stream position if needed for certain libraries
import io
stream = io.BytesIO(data)
text = ''
try:
if lower.endswith('.txt'):
text = data.decode(errors='ignore')
elif lower.endswith('.docx'):
try:
import docx
doc = docx.Document(stream)
text = '\n'.join(p.text for p in doc.paragraphs)
except Exception:
text = ''
elif lower.endswith('.pdf'):
try:
import PyPDF2
reader = PyPDF2.PdfReader(stream)
parts = []
for page in reader.pages[:20]: # safety limit
try:
parts.append(page.extract_text() or '')
except Exception:
continue
text = '\n'.join(parts)
# OCR fallback if minimal text extracted (likely scanned PDF)
if len(text.strip()) < 50:
try:
from pdf2image import convert_from_bytes
from PIL import Image
import pytesseract
images = convert_from_bytes(data, first_page=1, last_page=min(10, len(reader.pages)))
ocr_parts = []
for img in images:
ocr_parts.append(pytesseract.image_to_string(img))
ocr_text = '\n'.join(ocr_parts)
if len(ocr_text.strip()) > len(text.strip()):
text = ocr_text
except Exception:
pass # Fall back to original text layer extraction
except Exception:
text = ''
elif lower.endswith(('.png','.jpg','.jpeg','.tif','.tiff')):
try:
from PIL import Image
import pytesseract
img = Image.open(stream)
text = pytesseract.image_to_string(img)
except Exception:
text = ''
else:
# Attempt generic decode
try:
text = data.decode(errors='ignore')
except Exception:
text = ''
except Exception:
text = ''
# Basic cleanup
cleaned = '\n'.join(line.rstrip() for line in (text or '').splitlines())
return cleaned.strip()
# Admin restore from data.json (profiles and songs)
@app.route('/api/admin/restore', methods=['POST'])
@rate_limit(max_per_minute=5)
@require_api_key # Requires API key for admin operations
def admin_restore():
logger.info(f'Admin restore initiated from {request.remote_addr}')
# Determine data.json location: prefer backend/data.json, fallback to project root
backend_path = os.path.join(os.path.dirname(__file__), 'data.json')
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data.json'))
source_path = backend_path if os.path.exists(backend_path) else root_path if os.path.exists(root_path) else None
if not source_path:
return jsonify({ 'ok': False, 'error': 'data.json not found in backend/ or project root' }), 404
try:
with open(source_path, 'r', encoding='utf-8') as f:
payload = json.load(f)
except Exception as e:
return jsonify({ 'ok': False, 'error': f'Failed to read data.json: {str(e)}' }), 400
# Validate payload structure
if not isinstance(payload, dict):
return jsonify({ 'ok': False, 'error': 'Invalid data format' }), 400
db = get_db()
created = { 'profiles': 0, 'songs': 0 }
updated = { 'profiles': 0, 'songs': 0 }
try:
import uuid
# Restore profiles
profiles = payload.get('profiles', [])
for p in profiles:
name = (p.get('name') or f"{(p.get('first_name') or '').strip()} {(p.get('last_name') or '').strip()}" ).strip()
if not name:
continue
existing = db.query(Profile).filter(Profile.name == name).first()
if existing:
changed = False
for key in ['email','contact_number','default_key','notes']:
if p.get(key) and getattr(existing, key, None) != p.get(key):
setattr(existing, key, p.get(key))
changed = True
if changed:
updated['profiles'] += 1
else:
profile_id = str(p.get('id')) if p.get('id') else str(uuid.uuid4())
new_p = Profile(
id=profile_id,
name=name,
email=p.get('email') or '',
contact_number=p.get('contact_number') or '',
default_key=p.get('default_key') or 'C',
notes=p.get('notes') or ''
)
db.add(new_p)
created['profiles'] += 1
# Restore songs
songs = payload.get('songs', [])
for s in songs:
title = (s.get('title') or '').strip()
if not title:
continue
existing = db.query(Song).filter(Song.title == title).first()
lyrics = s.get('lyrics') or s.get('content') or ''
chords = s.get('chords') or ''
singer = s.get('singer') or s.get('artist') or ''
artist = s.get('artist') or ''
band = s.get('band') or ''
if existing:
changed = False
if lyrics and (existing.lyrics or '') != lyrics:
existing.lyrics = lyrics
changed = True
if chords and (existing.chords or '') != chords:
existing.chords = chords
changed = True
if singer and (existing.singer or '') != singer:
existing.singer = singer
changed = True
if artist and (existing.artist or '') != artist:
existing.artist = artist
changed = True
if band and (existing.band or '') != band:
existing.band = band
changed = True
if changed:
updated['songs'] += 1
else:
song_id = str(s.get('id')) if s.get('id') else str(uuid.uuid4())
new_s = Song(id=song_id, title=title, artist=artist, band=band, singer=singer, lyrics=lyrics, chords=chords)
db.add(new_s)
created['songs'] += 1
db.commit()
return jsonify({ 'ok': True, 'profiles_created': created['profiles'], 'profiles_updated': updated['profiles'], 'songs_created': created['songs'], 'songs_updated': updated['songs'], 'source': source_path })
except Exception as e:
db.rollback()
return jsonify({ 'ok': False, 'error': str(e) }), 500
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/upload_lyric', methods=['POST'])
@rate_limit(max_per_minute=10)
def upload_lyric():
"""Accept a lyric file and return extracted text. Does NOT create a Song automatically."""
if 'file' not in request.files:
return jsonify({'error':'file_missing'}), 400
f = request.files['file']
if not f.filename:
return jsonify({'error':'empty_filename'}), 400
# Sanitize filename to prevent path traversal attacks
safe_filename = sanitize_filename(f.filename)
# Validate file extension
allowed_extensions = {'.txt', '.docx', '.pdf', '.png', '.jpg', '.jpeg', '.tif', '.tiff'}
file_ext = os.path.splitext(safe_filename)[1].lower()
if file_ext not in allowed_extensions:
logger.warning(f'Invalid file upload attempt: {file_ext} from {request.remote_addr}')
return jsonify({'error':'invalid_file_type', 'message':'Only text, docx, pdf, and image files are allowed'}), 400
# Validate file size (max 10MB)
f.seek(0, 2) # Seek to end
file_size = f.tell()
f.seek(0) # Reset to beginning
if file_size > 10 * 1024 * 1024:
return jsonify({'error':'file_too_large', 'max_size': '10MB'}), 413
# Validate filename to prevent path traversal
if '..' in f.filename or '/' in f.filename or '\\' in f.filename:
return jsonify({'error':'invalid_filename'}), 400
# Optional metadata fields from form
title = (request.form.get('title') or f.filename.rsplit('.',1)[0])[:500] # Limit length
artist = (request.form.get('artist') or '')[:500]
band = (request.form.get('band') or '')[:500]
extracted = extract_text_from_file(f)
if not extracted:
return jsonify({'error':'extraction_failed','title':title}), 422
# Return without saving; front-end will present in modal for editing & saving
sample = extracted[:150] + ('...' if len(extracted) > 150 else '')
return jsonify({
'status':'ok',
'title': title,
'artist': artist,
'band': band,
'lyrics': extracted,
'preview': sample,
'length': len(extracted)
})
@app.route('/api/providers')
@rate_limit(max_per_minute=60)
def providers():
"""Report which external provider tokens are configured (OpenAI removed)."""
return jsonify({
'local_db': True,
'chartlyrics': True,
'lifeway_link': True
})
@app.route('/')
def index():
return jsonify({'message': 'House of Prayer Song Lyrics API (Flask)', 'port': FLASK_PORT})
@app.route('/api/health')
def health():
return jsonify({'status': 'ok', 'ts': datetime.utcnow().isoformat()})
# Profile Selection Management
@app.route('/api/profile-selection/clear', methods=['POST', 'OPTIONS'])
def clear_profile_selection():
"""Clear profile selection (frontend manages this in localStorage, but endpoint needed for API compatibility)"""
if request.method == 'OPTIONS':
return '', 204
return jsonify({'status': 'ok', 'message': 'Profile selection cleared'})
# Profiles CRUD
@app.route('/api/profiles', methods=['GET','POST'])
@rate_limit(max_per_minute=600)
@require_auth
def profiles():
db = get_db()
try:
if request.method == 'GET':
# Cache profiles list for 5 minutes
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
cached = cache.get(cache_key)
if cached is not None:
return jsonify(cached)
items = db.query(Profile).all()
result = [serialize_profile(p, include_song_count=True, db=db) for p in items]
cache.set(cache_key, result, timeout=300)
return jsonify(result)
import uuid
data = request.get_json() or {}
# Validate name
profile_data = extract_profile_data(data)
if not profile_data['name']:
return validation_error('name', 'Name is required')
profile_id = data.get('id') or str(uuid.uuid4())
p = Profile(id=profile_id, **profile_data)
db.add(p)
db.commit()
# Invalidate cache
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Profile created: {profile_id} from {request.remote_addr}')
return jsonify(serialize_profile(p, include_song_count=True, db=db))
except Exception as e:
db.rollback()
logger.error(f'Error in profiles endpoint: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/profiles/<pid>', methods=['PUT','DELETE'])
@rate_limit(max_per_minute=30)
@require_auth
@require_permission('edit')
def profile_item(pid):
# Validate ID
if not validate_id(pid):
return validation_error('id', 'Invalid profile ID format')
db = get_db()
try:
p = db.query(Profile).filter(Profile.id == pid).first()
if not p:
return not_found_response('Profile')
if request.method == 'PUT':
d = request.get_json() or {}
profile_data = extract_profile_data(d)
update_model_fields(p, profile_data)
db.commit()
# Invalidate cache
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Profile updated: {pid}')
return jsonify(serialize_profile(p, include_song_count=True, db=db))
# DELETE
db.delete(p)
db.commit()
# Invalidate cache
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Profile deleted: {pid}')
return success_response({'status': 'deleted'})
except Exception as e:
db.rollback()
logger.error(f'Error in profile_item: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
# Songs CRUD + search (local)
@app.route('/api/songs', methods=['GET','POST'])
@rate_limit(max_per_minute=300)
@require_auth
def songs():
db = get_db()
try:
if request.method == 'GET':
q = (request.args.get('q','') or '')[:500]
items = search_songs(db, Song, q)
# Include lyrics preview (first 200 chars) for Database component
return jsonify([serialize_song(s, include_full_content=False) for s in items])
import uuid
d = request.get_json() or {}
# Validate and extract song data
song_data = extract_song_data(d)
if not song_data['title'].strip():
return validation_error('title', 'Title is required')
song_id = d.get('id') or str(uuid.uuid4())
s = Song(id=song_id, **song_data)
db.add(s)
db.commit()
# Invalidate cache
cache_key = f'songs_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Song created: {song_id}')
return jsonify({'id': s.id})
except Exception as e:
db.rollback()
logger.error(f'Error in songs endpoint: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/songs/<sid>', methods=['GET','PUT','DELETE'])
@rate_limit(max_per_minute=30)
@require_auth
def song_item(sid):
# Validate ID
if not validate_id(sid):
return validation_error('id', 'Invalid song ID format')
db = get_db()
try:
s = db.query(Song).filter(Song.id == sid).first()
if not s:
return not_found_response('Song')
if request.method == 'GET':
# Detach from session to prevent lazy loading
db.expunge(s)
return jsonify(serialize_song(s, include_full_content=True))
if request.method == 'PUT':
d = request.get_json() or {}
song_data = extract_song_data(d)
update_model_fields(s, song_data)
s.updated_at = int(datetime.utcnow().timestamp())
db.commit()
# Invalidate cache
cache_key = f'songs_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Song updated: {sid}')
return success_response()
# DELETE
db.delete(s)
db.commit()
# Invalidate cache
cache_key = f'songs_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Song deleted: {sid}')
return success_response({'status': 'deleted'})
except Exception as e:
db.rollback()
logger.error(f'Error in song_item: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
# Planning (date-based)
@app.route('/api/plans', methods=['GET','POST'])
@rate_limit(max_per_minute=300)
@require_auth
def plans():
db = get_db()
try:
if request.method == 'GET':
# Cache plans list for 3 minutes (more volatile data)
cache_key = f'plans_list_{session.get("username", "anonymous")}'
cached = cache.get(cache_key)
if cached is not None:
return jsonify(cached)
items = db.query(Plan).all()
result = [serialize_plan(p) for p in items]
cache.set(cache_key, result, timeout=180)
return jsonify(result)
import uuid
d = request.get_json() or {}
plan_data = extract_plan_data(d)
plan_id = d.get('id') or str(uuid.uuid4())
# Validate date format (basic check)
if plan_data['date'] and len(plan_data['date']) > 50:
return validation_error('date', 'Invalid date format')
plan = Plan(id=plan_id, **plan_data)
db.add(plan)
db.commit()
# Invalidate cache
cache_key = f'plans_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Plan created: {plan_id}')
return jsonify({'id': plan.id})
except Exception as e:
db.rollback()
logger.error(f'Error in plans endpoint: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/plans/<pid>', methods=['GET','PUT','DELETE'])
@rate_limit(max_per_minute=300)
@require_auth
def plan_detail(pid):
if not validate_id(pid):
return validation_error('id', 'Invalid plan ID format')
db = get_db()
try:
plan = db.query(Plan).filter(Plan.id == pid).first()
if not plan:
return not_found_response('Plan')
if request.method == 'GET':
return jsonify(serialize_plan(plan))
if request.method == 'PUT':
d = request.get_json() or {}
plan_data = extract_plan_data(d)
update_model_fields(plan, plan_data)
# Handle songs if provided
if 'songs' in d and isinstance(d['songs'], list):
# Delete existing plan songs
db.query(PlanSong).filter(PlanSong.plan_id==pid).delete()
# Add new songs
for idx, song in enumerate(d['songs']):
song_id = song.get('id') if isinstance(song, dict) else song
if song_id:
link = PlanSong(
id=str(uuid.uuid4()),
plan_id=pid,
song_id=str(song_id),
order_index=idx
)
db.add(link)
db.commit()
# Invalidate cache
cache_key = f'plans_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Plan updated: {pid}')
return jsonify(serialize_plan(plan))
# DELETE
# First delete all related plan songs
db.query(PlanSong).filter(PlanSong.plan_id==pid).delete()
db.delete(plan)
db.commit()
# Invalidate cache
cache_key = f'plans_list_{session.get("username", "anonymous")}'
cache.delete(cache_key)
logger.info(f'Plan deleted: {pid}')
return success_response({'status': 'deleted'})
except Exception as e:
db.rollback()
logger.error(f'Error in plan_detail: {e}')
return error_response('server_error', str(e), 500)
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/plans/<pid>/songs', methods=['GET','POST'])
@rate_limit(max_per_minute=60)
def plan_songs(pid):
if not pid or len(pid) > 255:
return jsonify({'error':'invalid_id'}), 400
db = get_db()
try:
plan = db.query(Plan).filter(Plan.id == pid).first()
if not plan:
return jsonify({'error':'plan_not_found'}), 404
if request.method == 'GET':
# Optimized: Use JOIN to fetch songs in single query (avoids N+1)
results = db.query(PlanSong, Song).\
join(Song, PlanSong.song_id == Song.id).\
filter(PlanSong.plan_id == pid).\
order_by(PlanSong.order_index).\
all()
return jsonify([{
'id': plan_song.id,
'song_id': song.id,
'order_index': plan_song.order_index,
'song': {
'id': song.id,
'title': song.title,
'artist': song.artist or '',
'band': song.band or '',
'singer': song.singer or ''
}
} for plan_song, song in results])
d = request.get_json() or {}
song_id = d.get('song_id')
if not song_id:
return jsonify({'error': 'song_id_required'}), 400
order_index = int(d.get('order_index') or 0)
link_id = str(uuid.uuid4())
link = PlanSong(id=link_id, plan_id=pid, song_id=song_id, order_index=order_index)
db.add(link)
db.commit()
# Clear cache after add
# cache.delete_memoized(plan_songs, pid)
return jsonify({'id':link.id})
except Exception as e:
db.rollback()
return jsonify({'error': str(e)}), 500
finally:
pass # Session cleanup handled by teardown_appcontext
# Profile Songs endpoints
@app.route('/api/profiles/<pid>/songs', methods=['GET','POST'])
@rate_limit(max_per_minute=300)
def profile_songs(pid):
import uuid
# Validate profile ID
if not pid or len(pid) > 255:
return jsonify({'error':'invalid_profile_id'}), 400
db = get_db()
try:
profile = db.query(Profile).filter(Profile.id == pid).first()
if not profile:
return jsonify({'error':'profile_not_found'}), 404
if request.method == 'GET':
try:
# Optimized: Get all profile songs with full song data in fewer queries
links = db.query(ProfileSong).filter(ProfileSong.profile_id==pid).all()
# Get all song IDs at once
song_ids = [l.song_id for l in links]
if not song_ids:
return jsonify([])
# Fetch all songs in one query
songs = db.query(Song).filter(Song.id.in_(song_ids)).all()
songs_dict = {s.id: s for s in songs}
# Fetch all custom keys in one query
keys = db.query(ProfileSongKey).filter(
ProfileSongKey.profile_id==pid,
ProfileSongKey.song_id.in_(song_ids)
).all()
keys_dict = {k.song_id: k.song_key for k in keys}
# Build result with full song data
result = []
for l in links:
song = songs_dict.get(l.song_id)
if not song:
continue
song_key = keys_dict.get(l.song_id, profile.default_key if profile.default_key else 'C')
result.append({
'id': song.id,
'title': song.title or '',
'artist': song.artist or '',
'band': song.band or '',
'singer': song.singer or '',
'lyrics': song.lyrics or '',
'chords': song.chords or '',
'memo': song.memo or '',
'created_at': song.created_at if hasattr(song, 'created_at') else None,
'updated_at': song.updated_at if hasattr(song, 'updated_at') else None,
'song_key': song_key,
'profile_song_id': l.id
})
return jsonify(result)
except Exception as e:
logger.error(f'Error loading profile songs for {pid}: {str(e)}')
return jsonify({'error': 'failed_to_load_songs', 'message': str(e)}), 500
d = request.get_json() or {}
song_id = d.get('song_id')
# Validate song_id
if not song_id:
return jsonify({'error': 'song_id_required'}), 400
# Check if song exists
song = db.query(Song).filter(Song.id == song_id).first()
if not song:
return jsonify({'error': 'song_not_found'}), 404
# Check if association already exists
existing = db.query(ProfileSong).filter(
ProfileSong.profile_id==pid,
ProfileSong.song_id==song_id
).first()
if existing:
return jsonify({'id':existing.id, 'exists':True})
# Create profile-song association
link_id = str(uuid.uuid4())
link = ProfileSong(id=link_id, profile_id=pid, song_id=song_id)
db.add(link)
# Save custom key if provided
song_key = d.get('song_key')
if song_key:
key_id = str(uuid.uuid4())
key_record = ProfileSongKey(id=key_id, profile_id=pid, song_id=song_id, song_key=song_key)
db.add(key_record)
db.commit()
logger.info(f'Song {song_id} added to profile {pid}')
return jsonify({'id':link.id, 'success': True})
except Exception as e:
db.rollback()
logger.error(f'Error in profile_songs for {pid}: {str(e)}')
return jsonify({'error': 'operation_failed', 'message': str(e)}), 500
finally:
pass # Session cleanup handled by teardown_appcontext
@app.route('/api/profiles/<pid>/songs/<sid>', methods=['GET','PUT','DELETE'])
@rate_limit(max_per_minute=300)
def profile_song_item(pid, sid):
import uuid
db = get_db()
try:
link = db.query(ProfileSong).filter(
ProfileSong.profile_id==pid,
ProfileSong.song_id==sid
).first()
if not link:
return jsonify({'error':'not_found'}),404
if request.method == 'GET':
# Get custom key if exists
key_record = db.query(ProfileSongKey).filter(
ProfileSongKey.profile_id==pid,
ProfileSongKey.song_id==sid
).first()
profile = db.query(Profile).filter(Profile.id == pid).first()
song_key = key_record.song_key if key_record else (profile.default_key if profile else 'C')
return jsonify({
'id': link.id,
'profile_id': link.profile_id,
'song_id': link.song_id,
'song_key': song_key
})
if request.method == 'PUT':
d = request.get_json() or {}
new_key = d.get('song_key')
if new_key:
# Update or create key record
key_record = db.query(ProfileSongKey).filter(
ProfileSongKey.profile_id==pid,
ProfileSongKey.song_id==sid
).first()
if key_record:
key_record.song_key = new_key
else:
key_id = str(uuid.uuid4())
key_record = ProfileSongKey(id=key_id, profile_id=pid, song_id=sid, song_key=new_key)
db.add(key_record)
db.commit()
return jsonify({'status':'ok'})
# DELETE - remove both records
db.delete(link)
key_record = db.query(ProfileSongKey).filter(
ProfileSongKey.profile_id==pid,
ProfileSongKey.song_id==sid
).first()
if key_record:
db.delete(key_record)
db.commit()
return jsonify({'status':'deleted'})
finally:
pass # Session cleanup handled by teardown_appcontext
# External search using ChartLyrics API + Local Database
@app.route('/api/search_external')
@rate_limit(max_per_minute=20)
def search_external():
import urllib.request
import urllib.parse
import xml.etree.ElementTree as ET
q = (request.args.get('q', '') or '')[:500].strip() # Limit query length
filter_type = request.args.get('filter', 'all')
# Validate filter type
if filter_type not in ['all', 'title', 'artist', 'band']:
filter_type = 'all'
if not q:
return jsonify({'query': q, 'results': []})
results = []
# First, search local database
db = get_db()
try:
local_songs = db.query(Song).all()
def match_local(s):
q_lower = q.lower()
if filter_type == 'title':
return q_lower in (s.title or '').lower()
elif filter_type == 'artist':
return q_lower in (s.artist or '').lower()
elif filter_type == 'band':
return q_lower in (s.band or '').lower()
else: # all
return (q_lower in (s.title or '').lower() or
q_lower in (s.artist or '').lower() or
q_lower in (s.band or '').lower())
for s in local_songs:
if match_local(s):
snippet = (s.lyrics or '')[:150] + '...' if len(s.lyrics or '') > 150 else (s.lyrics or '')
results.append({
'title': s.title,
'artist': s.artist,
'band': s.band,
'snippet': snippet,
'source': 'local_db'
})
finally:
pass # Session cleanup handled by teardown_appcontext
# Then search ChartLyrics (free API)
try:
# ChartLyrics search - expects artist and song
# We'll try different combinations based on filter
search_terms = []
if filter_type == 'artist':
search_terms.append({'artist': q, 'song': ''})
elif filter_type == 'title':
search_terms.append({'artist': '', 'song': q})
else:
# For 'all' and 'band', try both
search_terms.append({'artist': q, 'song': ''})
search_terms.append({'artist': '', 'song': q})
for term in search_terms:
try:
artist_encoded = urllib.parse.quote(term['artist'])
song_encoded = urllib.parse.quote(term['song'])
url = f'http://api.chartlyrics.com/apiv1.asmx/SearchLyric?artist={artist_encoded}&song={song_encoded}'
with urllib.request.urlopen(url, timeout=5) as response:
xml_data = response.read()
root = ET.fromstring(xml_data)
# Parse XML results (ChartLyrics returns XML)
namespace = {'cl': 'http://api.chartlyrics.com/'}
for item in root.findall('.//cl:SearchLyricResult', namespace):
title = item.find('cl:Song', namespace)
artist = item.find('cl:Artist', namespace)
lyric_id = item.find('cl:LyricId', namespace)
if title is not None and artist is not None:
# Get actual lyrics
snippet = "Preview available - click to view full lyrics"
results.append({
'title': title.text or 'Unknown',
'artist': artist.text or 'Unknown',
'band': artist.text or 'Unknown',
'snippet': snippet,
'source': 'chartlyrics',
'lyric_id': lyric_id.text if lyric_id is not None else None
})
# Limit to 5 results per search
if len([r for r in results if r.get('source') == 'chartlyrics']) >= 5:
break
except:
continue
except Exception as e:
# If external API fails, we still have local results
pass
# (Genius integration removed per user request)
# Remove duplicates and limit total results
seen = set()
unique_results = []
for r in results:
key = (r['title'].lower(), r['artist'].lower())
if key not in seen:
seen.add(key)
unique_results.append(r)
if len(unique_results) >= 10:
break
# Add Lifeway Worship search link as an additional source entry
try:
import urllib.parse as _urlparse
lifeway_url = f"https://worship.lifeway.com/findAndBuy/home?findMappable=false&searchString={_urlparse.quote(q)}"
unique_results.append({
'title': f'Search on Lifeway: {q}',
'artist': '',
'band': 'Lifeway Worship',
'snippet': 'Open Lifeway Worship results for this query',
'source': 'lifeway',
'url': lifeway_url
})
except Exception:
pass
return jsonify({
'query': q,
'filter': filter_type,
'results': unique_results,
'sources': 'Local DB + ChartLyrics (+ Lifeway link)'
})
@app.route('/api/export/<int:plan_id>')
@rate_limit(max_per_minute=10)
def export_plan(plan_id):
"""Export a plan's songs as a formatted text document."""
if plan_id < 0:
return jsonify({'error':'invalid_plan_id'}), 400
db = get_db()
try:
plan = db.query(Plan).filter(Plan.id == plan_id).first()
if not plan:
return jsonify({'error':'plan_not_found'}), 404
profile = db.query(Profile).get(plan.profile_id) if plan.profile_id else None
plan_songs = db.query(PlanSong).filter(PlanSong.plan_id==plan_id).order_by(PlanSong.order_index).all()
lines = []
lines.append('=' * 60)
lines.append(f'WORSHIP SETLIST: {plan.date}')
if profile:
lines.append(f'Profile: {profile.name} (Key: {profile.default_key})')
if plan.notes:
lines.append(f'Notes: {plan.notes}')
lines.append('=' * 60)
lines.append('')
for idx, ps in enumerate(plan_songs, 1):
song = db.query(Song).filter(Song.id == ps.song_id).first()
if not song:
continue
lines.append(f'{idx}. {song.title}')
lines.append(f' Artist: {song.artist or "Unknown"} | Band: {song.band or "Unknown"}')
lines.append('')
if song.lyrics:
lines.append(' LYRICS:')
for line in song.lyrics.splitlines():
lines.append(f' {line}')
lines.append('')
if song.chords:
lines.append(' CHORDS:')
for line in song.chords.splitlines():
lines.append(f' {line}')
lines.append('')
lines.append('-' * 60)
lines.append('')
content = '\n'.join(lines)
response = app.make_response(content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="setlist_{plan.date}.txt"'
return response
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
pass # Session cleanup handled by teardown_appcontext
# ==============================================================================
# USER MANAGEMENT ENDPOINTS
# ==============================================================================
@app.route('/api/auth/login', methods=['POST'])
@rate_limit(max_per_minute=5) # Stricter rate limit for login
def login():
"""Authenticate user with username and password"""
data = request.get_json() or {}
username = sanitize_text(data.get('username', ''), 255).strip()
password = data.get('password', '').strip()
if not username or not password:
return jsonify({'success': False, 'error': 'username_password_required'}), 400
# Rate limiting: track failed attempts
if len(password) > 128: # Prevent DoS via long passwords
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
db = get_db()
try:
# Query User table
user = db.query(User).filter(User.username == username).first()
if not user:
# Constant-time response to prevent user enumeration
# Still hash a dummy password to prevent timing attacks
import bcrypt
bcrypt.checkpw(b'dummy', bcrypt.gensalt())
logger.warning(f'Failed login attempt for non-existent user: {username} from {request.remote_addr}')
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
# Check if user is active
if not user.active:
logger.warning(f'Login attempt for disabled account: {username} from {request.remote_addr}')
return jsonify({'success': False, 'error': 'account_disabled'}), 403
# Verify password
if user.check_password(password):
# SECURITY: Regenerate session ID to prevent session fixation
old_session_data = dict(session)
session.clear()
# Update last login
user.last_login = datetime.now()
db.commit()
# Set new session with regenerated ID
session['username'] = user.username
session['role'] = user.role
session['permissions'] = user.get_permissions_list()
session['login_time'] = datetime.now().isoformat()
session.permanent = True
logger.info(f'Successful login for {username} from {request.remote_addr}')
return jsonify({
'success': True,
'username': user.username,
'role': user.role,
'permissions': user.get_permissions_list()
})
else:
logger.warning(f'Failed login attempt for {username} from {request.remote_addr}')
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
except Exception as e:
logger.error(f"Login error: {e}")
return jsonify({'success': False, 'error': 'server_error'}), 500
@app.route('/api/auth/logout', methods=['POST'])
@rate_limit(max_per_minute=30)
def logout():
"""Logout current user"""
session.clear()
return jsonify({'success': True})
@app.route('/api/users', methods=['GET', 'POST'])
@rate_limit(max_per_minute=30)
@require_auth
@require_permission('settings')
def users():
"""Get all users or create a new user"""
db = get_db()
try:
if request.method == 'GET':
# Check if requester is admin or has settings permission
username = session.get('username')
if not username:
return jsonify({'error': 'unauthorized', 'message': 'Not logged in'}), 401
user = db.query(User).filter(User.username == username).first()
if not user or (user.role != 'admin' and not user.has_permission('settings')):
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
# Get all users
all_users = db.query(User).all()
return jsonify([{
'id': u.id,
'username': u.username,
'role': u.role,
'permissions': u.get_permissions_list(),
'active': u.active,
'created_at': u.created_at.isoformat() if u.created_at else None,
'last_login': u.last_login.isoformat() if u.last_login else None
} for u in all_users])
# POST - Create new user
# Check if requester is admin or has settings permission
username_auth = session.get('username')
if not username_auth:
return jsonify({'error': 'unauthorized', 'message': 'Not logged in'}), 401
user_auth = db.query(User).filter(User.username == username_auth).first()
if not user_auth or (user_auth.role != 'admin' and not user_auth.has_permission('settings')):
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
data = request.get_json() or {}
username = data.get('username', '').strip()
password = data.get('password', '').strip()
role = data.get('role', 'viewer').strip()
permissions = data.get('permissions', ['view'])
if not username or not password:
return jsonify({'error': 'username_password_required'}), 400
# Check if username exists
existing = db.query(User).filter(User.username == username).first()
if existing:
return jsonify({'error': 'username_exists'}), 409
# Create user
new_user = User(
username=username,
role=role,
permissions=','.join(permissions) if isinstance(permissions, list) else permissions,
active=True
)
new_user.set_password(password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return jsonify({
'success': True,
'id': new_user.id,
'username': new_user.username,
'role': new_user.role,
'permissions': new_user.get_permissions_list(),
'active': new_user.active
}), 201
except Exception as e:
db.rollback()
logger.error(f"User management error: {e}")
return jsonify({'error': str(e)}), 500
finally:
pass
@app.route('/api/users/<user_id>', methods=['GET', 'PUT', 'DELETE'])
@rate_limit(max_per_minute=30)
@require_auth
@require_permission('settings')
def user_detail(user_id):
"""Get, update, or delete a specific user"""
db = get_db()
try:
user = db.query(User).filter(User.id == user_id).first()
if not user:
return jsonify({'error': 'user_not_found'}), 404
if request.method == 'GET':
return jsonify({
'id': user.id,
'username': user.username,
'role': user.role,
'permissions': user.get_permissions_list(),
'active': user.active,
'created_at': user.created_at.isoformat() if user.created_at else None,
'last_login': user.last_login.isoformat() if user.last_login else None
})
# Check admin access for PUT/DELETE
auth_user = session.get('username')
auth_user_obj = db.query(User).filter(User.username == auth_user).first()
if not auth_user_obj or (auth_user_obj.role != 'admin' and not auth_user_obj.has_permission('settings')):
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
if request.method == 'PUT':
data = request.get_json() or {}
# Update username if provided and different
if 'username' in data and data['username'] and data['username'] != user.username:
new_username = data['username'].strip()
# Check if new username already exists
existing = db.query(User).filter(User.username == new_username, User.id != user_id).first()
if existing:
return jsonify({'success': False, 'error': 'username_already_exists'}), 409
# Update biometric credentials to point to new username
from postgresql_models import BiometricCredential
biometric_creds = db.query(BiometricCredential).filter(BiometricCredential.user_id == user.id).all()
for cred in biometric_creds:
cred.username = new_username
user.username = new_username
# Update password if provided
if 'password' in data and data['password']:
user.set_password(data['password'])
# Update role if provided
if 'role' in data:
user.role = data['role']
# Update permissions if provided
if 'permissions' in data:
perms = data['permissions']
user.permissions = ','.join(perms) if isinstance(perms, list) else perms
# Update active status if provided
if 'active' in data:
user.active = bool(data['active'])
user.updated_at = datetime.now()
db.commit()
db.refresh(user)
return jsonify({
'success': True,
'id': user.id,
'username': user.username,
'role': user.role,
'permissions': user.get_permissions_list(),
'active': user.active
})
# DELETE
db.delete(user)
db.commit()
return jsonify({'success': True, 'status': 'deleted'})
except Exception as e:
db.rollback()
logger.error(f"User detail error: {e}")
return jsonify({'error': str(e)}), 500
finally:
pass
@app.route('/api/auth/check', methods=['GET'])
@rate_limit(max_per_minute=60)
def check_auth():
"""Check if current session is authenticated and return user info"""
username = session.get('username')
role = session.get('role')
if not username:
return jsonify({'authenticated': False}), 401
# Validate session hasn't expired (additional check)
login_time_str = session.get('login_time')
if login_time_str:
from datetime import datetime, timedelta
try:
login_time = datetime.fromisoformat(login_time_str)
if datetime.now() - login_time > timedelta(hours=8):
session.clear()
return jsonify({'authenticated': False, 'reason': 'session_expired'}), 401
except (ValueError, TypeError):
pass
db = get_db()
try:
# Check User table first
user = db.query(User).filter(User.username == username).first()
if user:
# User found in database
if not user.active:
session.clear()
return jsonify({'authenticated': False}), 401
# Update session with latest user data
session['role'] = user.role
session['permissions'] = user.get_permissions_list()
return jsonify({
'authenticated': True,
'username': user.username,
'role': user.role,
'permissions': user.get_permissions_list()
})
else:
# Fallback: session exists but user not in database yet
# This handles legacy logins - return session data
if role:
return jsonify({
'authenticated': True,
'username': username,
'role': role,
'permissions': session.get('permissions', ['view'])
})
else:
# Invalid session
session.clear()
return jsonify({'authenticated': False}), 401
except Exception as e:
logger.error(f"Auth check error: {e}")
return jsonify({'authenticated': False}), 500
# ==============================================================================
# BIOMETRIC AUTHENTICATION ENDPOINTS
# ==============================================================================
@app.route('/api/biometric/register', methods=['POST'])
@rate_limit(max_per_minute=10)
def register_biometric():
"""Register biometric for a user"""
try:
data = request.json
username = data.get('username')
device_name = data.get('deviceName', 'Unknown Device')
device_fingerprint = data.get('deviceFingerprint')
device_info = data.get('deviceInfo', '')
if not all([username, device_fingerprint]):
return jsonify({'error': 'Missing username or device fingerprint'}), 400
result = register_biometric_credential(
username, device_name, device_fingerprint, device_info
)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logger.error(f"Error in biometric registration: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/biometric/authenticate', methods=['POST'])
@rate_limit(max_per_minute=30)
def authenticate_biometric_endpoint():
"""Authenticate using biometric - device already verified fingerprint"""
try:
data = request.json
username = data.get('username') # Optional - can lookup by device fingerprint alone
device_fingerprint = data.get('deviceFingerprint')
logger.info(f"Biometric auth attempt for {username or 'device lookup'} from device {device_fingerprint}")
if not device_fingerprint:
logger.error("Missing device fingerprint")
return jsonify({'error': 'Device fingerprint required'}), 400
result = authenticate_biometric_by_device(username, device_fingerprint)
if result['success']:
# Set Flask session - use username from result (may have been looked up by device)
try:
username = result['username']
session['authenticated'] = True
session['username'] = username
session['role'] = result['role']
session['user_id'] = username # Could get actual user_id if needed
session['auth_time'] = datetime.now().isoformat()
logger.info(f"Biometric auth successful for {username}, role: {result['role']}")
except Exception as session_err:
logger.error(f"Error setting session: {session_err}")
return jsonify(result)
else:
logger.warning(f"Biometric auth failed for {username}: {result.get('error')}")
return jsonify(result), 401
except Exception as e:
logger.error(f"Error in biometric authentication: {e}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/api/biometric/status/<username>', methods=['GET'])
@rate_limit(max_per_minute=30)
def get_biometric_status(username):
"""Get biometric status for a user"""
try:
result = get_user_biometric_status(username)
return jsonify(result)
except Exception as e:
logger.error(f"Error getting biometric status: {e}")
return jsonify({'error': str(e)}), 500
result = get_user_credentials(username)
return jsonify(result)
except Exception as e:
logger.error(f"Error fetching credentials: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/biometric/credentials/<credential_id>', methods=['PUT'])
@rate_limit(max_per_minute=30)
def toggle_biometric_credential(credential_id):
"""Enable or disable a biometric credential"""
try:
data = request.json
enabled = data.get('enabled', True)
result = toggle_credential(credential_id, enabled)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 404
except Exception as e:
logger.error(f"Error toggling credential: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/biometric/credentials/<credential_id>', methods=['DELETE'])
@rate_limit(max_per_minute=30)
def delete_biometric_credential(credential_id):
"""Delete a biometric credential"""
try:
result = delete_credential(credential_id)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 404
except Exception as e:
logger.error(f"Error deleting credential: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/biometric/remove/<username>', methods=['DELETE'])
@rate_limit(max_per_minute=30)
def remove_user_biometric(username):
"""Remove all biometric credentials for a user"""
try:
result = delete_all_user_biometrics(username)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
logger.error(f"Error removing biometric: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Production mode: disable debug
app.run(host='0.0.0.0', port=FLASK_PORT, debug=False)