1676 lines
64 KiB
Python
1676 lines
64 KiB
Python
from flask import Flask, jsonify, request, make_response, session
|
|
from flask_cors import CORS
|
|
from flask_caching import Cache
|
|
from flask_compress import Compress
|
|
from datetime import datetime, timedelta
|
|
from postgresql_models import init_db, SessionLocal, Profile, Song, Plan, PlanSong, ProfileSong, ProfileSongKey, BiometricCredential, User
|
|
from functools import lru_cache
|
|
import hashlib
|
|
import hmac
|
|
import base64
|
|
import uuid
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv() # Loads variables from a .env file if present
|
|
except ImportError:
|
|
# dotenv not installed yet; proceed without loading .env
|
|
pass
|
|
|
|
import os
|
|
import json
|
|
import secrets
|
|
import logging
|
|
import re
|
|
# Import bleach if available, but not required for basic operation
|
|
try:
|
|
import bleach
|
|
BLEACH_AVAILABLE = True
|
|
except ImportError:
|
|
BLEACH_AVAILABLE = False
|
|
logger.warning('bleach not installed - using basic XSS protection')
|
|
|
|
from rate_limiter import rate_limit, get_rate_limit_headers
|
|
from validators import validate_request_data, PROFILE_SCHEMA, SONG_SCHEMA, PLAN_SCHEMA, sanitize_filename
|
|
from biometric_auth import (
|
|
register_biometric_credential, authenticate_biometric_by_device,
|
|
get_user_biometric_status, delete_all_user_biometrics
|
|
)
|
|
from helpers import (
|
|
success_response, error_response, not_found_response, validation_error,
|
|
sanitize_text, validate_id, serialize_profile, serialize_song, serialize_plan,
|
|
extract_profile_data, extract_song_data, extract_plan_data, search_songs,
|
|
update_model_fields, safe_db_operation, NotFoundError, get_or_404
|
|
)
|
|
|
|
# Configure logging
|
|
log_dir = os.path.join(os.path.dirname(__file__), 'logs')
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(os.path.join(log_dir, 'app.log')),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Generate strong secret key for sessions
|
|
app.secret_key = os.environ.get('SECRET_KEY') or secrets.token_hex(32)
|
|
|
|
# API Key Authentication (for backend-to-backend calls)
|
|
API_KEY = os.environ.get('API_KEY') or secrets.token_hex(32)
|
|
if os.environ.get('API_KEY'):
|
|
logger.info('API_KEY loaded from environment')
|
|
else:
|
|
logger.warning('API_KEY generated at runtime - set in .env for production')
|
|
|
|
# CSRF Token storage (in-memory for simplicity, use Redis in production)
|
|
csrf_tokens = {}
|
|
|
|
# Performance Optimization - In-Memory Caching with TTL
|
|
cache = Cache(app, config={
|
|
'CACHE_TYPE': 'SimpleCache',
|
|
'CACHE_DEFAULT_TIMEOUT': 300, # 5 minutes default
|
|
'CACHE_THRESHOLD': 500 # Max cached items
|
|
})
|
|
|
|
# Performance Optimization - Gzip Compression
|
|
Compress(app)
|
|
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/xml', 'application/json', 'application/javascript']
|
|
app.config['COMPRESS_LEVEL'] = 6 # Balance between speed and compression
|
|
app.config['COMPRESS_MIN_SIZE'] = 500 # Only compress responses > 500 bytes
|
|
|
|
# Session Security Settings (applies to all environments)
|
|
app.config['SESSION_COOKIE_SECURE'] = not app.debug # HTTPS only in production
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'Strict' # CSRF protection
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = 3600 # 1 hour
|
|
app.config['SESSION_COOKIE_NAME'] = '__Host-session' if not app.debug else 'session' # Secure prefix
|
|
|
|
# Request size limits
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max request size
|
|
|
|
# Security Headers Middleware
|
|
@app.after_request
|
|
def set_security_headers(response):
|
|
"""Add security headers to all responses"""
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
response.headers['X-XSS-Protection'] = '1; mode=block'
|
|
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
|
|
|
|
# Add rate limit headers
|
|
client_id = request.remote_addr or 'unknown'
|
|
rate_headers = get_rate_limit_headers(client_id)
|
|
for key, value in rate_headers.items():
|
|
response.headers[key] = value
|
|
|
|
# Remove server header to prevent information disclosure
|
|
response.headers.pop('Server', None)
|
|
|
|
# Add CSP header
|
|
response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'"
|
|
|
|
# Add caching headers based on endpoint type
|
|
if request.path.startswith('/static/') or request.path.endswith(('.js', '.css', '.png', '.jpg', '.svg')):
|
|
response.headers['Cache-Control'] = 'public, max-age=31536000, immutable' # 1 year for static assets
|
|
elif request.path in ['/api/health', '/api/providers']:
|
|
response.headers['Cache-Control'] = 'public, max-age=60' # 1 minute for health/status
|
|
elif request.method == 'GET' and request.path.startswith('/api/'):
|
|
response.headers['Cache-Control'] = 'private, max-age=30, must-revalidate' # 30 seconds for API reads
|
|
elif request.method in ['POST', 'PUT', 'DELETE']:
|
|
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate' # No cache for mutations
|
|
|
|
# Add ETag for GET requests to enable conditional requests
|
|
if request.method == 'GET' and response.status_code == 200:
|
|
if not response.headers.get('ETag'):
|
|
import hashlib
|
|
etag = hashlib.md5(response.get_data()).hexdigest()
|
|
response.headers['ETag'] = f'"{etag}"'
|
|
|
|
# Check If-None-Match header
|
|
if request.headers.get('If-None-Match') == response.headers['ETag']:
|
|
response.status_code = 304
|
|
response.set_data(b'')
|
|
|
|
return response
|
|
|
|
# CORS configuration - restricted to known origins
|
|
ALLOWED_ORIGINS = [
|
|
"http://localhost:5100",
|
|
"https://houseofprayer.ddns.net"
|
|
]
|
|
|
|
# Add development IP if in dev mode
|
|
if os.environ.get('FLASK_ENV') != 'production':
|
|
ALLOWED_ORIGINS.extend([
|
|
"http://192.168.10.130:5100",
|
|
"http://localhost:8080"
|
|
])
|
|
|
|
CORS(app, resources={
|
|
r"/api/*": {
|
|
"origins": ALLOWED_ORIGINS,
|
|
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
|
"allow_headers": ["Content-Type", "Authorization"],
|
|
"supports_credentials": True,
|
|
"max_age": 3600
|
|
}
|
|
})
|
|
init_db()
|
|
FLASK_PORT = int(os.environ.get('FLASK_PORT', '8080'))
|
|
|
|
# Authentication and Authorization Functions
|
|
def generate_csrf_token():
|
|
"""Generate a CSRF token for the session"""
|
|
if 'csrf_token' not in session:
|
|
session['csrf_token'] = secrets.token_hex(32)
|
|
return session['csrf_token']
|
|
|
|
def verify_csrf_token():
|
|
"""Verify CSRF token from request"""
|
|
token = request.headers.get('X-CSRF-Token') or request.form.get('csrf_token')
|
|
if not token or not session.get('csrf_token'):
|
|
return False
|
|
return hmac.compare_digest(token, session['csrf_token'])
|
|
|
|
def require_api_key(f):
|
|
"""Decorator to require API key for administrative endpoints"""
|
|
from functools import wraps
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
api_key = request.headers.get('X-API-Key')
|
|
if not api_key or not hmac.compare_digest(api_key, API_KEY):
|
|
logger.warning(f'Unauthorized API access attempt from {request.remote_addr}')
|
|
return jsonify({'error': 'unauthorized', 'message': 'Invalid or missing API key'}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def require_csrf(f):
|
|
"""Decorator to require CSRF token for state-changing operations"""
|
|
from functools import wraps
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
|
|
if not verify_csrf_token():
|
|
logger.warning(f'CSRF validation failed from {request.remote_addr}')
|
|
return jsonify({'error': 'csrf_validation_failed', 'message': 'Invalid or missing CSRF token'}), 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def require_auth(f):
|
|
"""Decorator to require authentication"""
|
|
from functools import wraps
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'username' not in session:
|
|
logger.warning(f'Unauthorized access attempt from {request.remote_addr}')
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def require_permission(permission):
|
|
"""Decorator to require specific permission"""
|
|
from functools import wraps
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
username = session.get('username')
|
|
if not username:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Authentication required'}), 401
|
|
|
|
db = get_db()
|
|
try:
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user or not user.has_permission(permission):
|
|
logger.warning(f'Permission denied for {username} on {request.endpoint}')
|
|
return jsonify({'error': 'forbidden', 'message': f'Permission "{permission}" required'}), 403
|
|
finally:
|
|
pass
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return decorator
|
|
|
|
# CSRF Token endpoint
|
|
@app.route('/api/csrf-token', methods=['GET'])
|
|
def get_csrf_token():
|
|
"""Get CSRF token for the session"""
|
|
token = generate_csrf_token()
|
|
return jsonify({'csrf_token': token})
|
|
|
|
# Validate critical environment variables in production
|
|
if not app.debug:
|
|
required_vars = ['POSTGRESQL_URI', 'SECRET_KEY']
|
|
missing = [var for var in required_vars if not os.environ.get(var)]
|
|
if missing:
|
|
print(f"WARNING: Missing critical environment variables: {', '.join(missing)}")
|
|
print("Please configure these in your .env file for production use.")
|
|
|
|
def get_db():
|
|
"""Create a new database session with automatic cleanup
|
|
|
|
IMPORTANT: Do NOT call db.close() in finally blocks when using scoped_session.
|
|
The teardown_appcontext handler automatically cleans up sessions.
|
|
Calling close() manually causes connection pool exhaustion.
|
|
"""
|
|
return SessionLocal()
|
|
|
|
@app.teardown_appcontext
|
|
def cleanup_session(exception=None):
|
|
"""Cleanup scoped database sessions after each request to prevent connection pool exhaustion
|
|
|
|
This is automatically called by Flask after every request, ensuring proper
|
|
session cleanup without manual intervention in route handlers.
|
|
"""
|
|
try:
|
|
SessionLocal.remove()
|
|
except Exception as e:
|
|
logger.error(f"Error during session cleanup: {e}")
|
|
# Force cleanup on error
|
|
try:
|
|
engine.dispose()
|
|
except:
|
|
pass
|
|
|
|
def extract_text_from_file(file_storage):
|
|
"""Extract text from an uploaded file (docx, pdf, image, txt)."""
|
|
filename = file_storage.filename or ''
|
|
lower = filename.lower()
|
|
data = file_storage.read()
|
|
# Reset stream position if needed for certain libraries
|
|
import io
|
|
stream = io.BytesIO(data)
|
|
text = ''
|
|
try:
|
|
if lower.endswith('.txt'):
|
|
text = data.decode(errors='ignore')
|
|
elif lower.endswith('.docx'):
|
|
try:
|
|
import docx
|
|
doc = docx.Document(stream)
|
|
text = '\n'.join(p.text for p in doc.paragraphs)
|
|
except Exception:
|
|
text = ''
|
|
elif lower.endswith('.pdf'):
|
|
try:
|
|
import PyPDF2
|
|
reader = PyPDF2.PdfReader(stream)
|
|
parts = []
|
|
for page in reader.pages[:20]: # safety limit
|
|
try:
|
|
parts.append(page.extract_text() or '')
|
|
except Exception:
|
|
continue
|
|
text = '\n'.join(parts)
|
|
# OCR fallback if minimal text extracted (likely scanned PDF)
|
|
if len(text.strip()) < 50:
|
|
try:
|
|
from pdf2image import convert_from_bytes
|
|
from PIL import Image
|
|
import pytesseract
|
|
images = convert_from_bytes(data, first_page=1, last_page=min(10, len(reader.pages)))
|
|
ocr_parts = []
|
|
for img in images:
|
|
ocr_parts.append(pytesseract.image_to_string(img))
|
|
ocr_text = '\n'.join(ocr_parts)
|
|
if len(ocr_text.strip()) > len(text.strip()):
|
|
text = ocr_text
|
|
except Exception:
|
|
pass # Fall back to original text layer extraction
|
|
except Exception:
|
|
text = ''
|
|
elif lower.endswith(('.png','.jpg','.jpeg','.tif','.tiff')):
|
|
try:
|
|
from PIL import Image
|
|
import pytesseract
|
|
img = Image.open(stream)
|
|
text = pytesseract.image_to_string(img)
|
|
except Exception:
|
|
text = ''
|
|
else:
|
|
# Attempt generic decode
|
|
try:
|
|
text = data.decode(errors='ignore')
|
|
except Exception:
|
|
text = ''
|
|
except Exception:
|
|
text = ''
|
|
# Basic cleanup
|
|
cleaned = '\n'.join(line.rstrip() for line in (text or '').splitlines())
|
|
return cleaned.strip()
|
|
|
|
# Admin restore from data.json (profiles and songs)
|
|
@app.route('/api/admin/restore', methods=['POST'])
|
|
@rate_limit(max_per_minute=5)
|
|
@require_api_key # Requires API key for admin operations
|
|
def admin_restore():
|
|
logger.info(f'Admin restore initiated from {request.remote_addr}')
|
|
|
|
# Determine data.json location: prefer backend/data.json, fallback to project root
|
|
backend_path = os.path.join(os.path.dirname(__file__), 'data.json')
|
|
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data.json'))
|
|
source_path = backend_path if os.path.exists(backend_path) else root_path if os.path.exists(root_path) else None
|
|
if not source_path:
|
|
return jsonify({ 'ok': False, 'error': 'data.json not found in backend/ or project root' }), 404
|
|
|
|
try:
|
|
with open(source_path, 'r', encoding='utf-8') as f:
|
|
payload = json.load(f)
|
|
except Exception as e:
|
|
return jsonify({ 'ok': False, 'error': f'Failed to read data.json: {str(e)}' }), 400
|
|
|
|
# Validate payload structure
|
|
if not isinstance(payload, dict):
|
|
return jsonify({ 'ok': False, 'error': 'Invalid data format' }), 400
|
|
|
|
db = get_db()
|
|
created = { 'profiles': 0, 'songs': 0 }
|
|
updated = { 'profiles': 0, 'songs': 0 }
|
|
|
|
try:
|
|
import uuid
|
|
# Restore profiles
|
|
profiles = payload.get('profiles', [])
|
|
for p in profiles:
|
|
name = (p.get('name') or f"{(p.get('first_name') or '').strip()} {(p.get('last_name') or '').strip()}" ).strip()
|
|
if not name:
|
|
continue
|
|
existing = db.query(Profile).filter(Profile.name == name).first()
|
|
if existing:
|
|
changed = False
|
|
for key in ['email','contact_number','default_key','notes']:
|
|
if p.get(key) and getattr(existing, key, None) != p.get(key):
|
|
setattr(existing, key, p.get(key))
|
|
changed = True
|
|
if changed:
|
|
updated['profiles'] += 1
|
|
else:
|
|
profile_id = str(p.get('id')) if p.get('id') else str(uuid.uuid4())
|
|
new_p = Profile(
|
|
id=profile_id,
|
|
name=name,
|
|
email=p.get('email') or '',
|
|
contact_number=p.get('contact_number') or '',
|
|
default_key=p.get('default_key') or 'C',
|
|
notes=p.get('notes') or ''
|
|
)
|
|
db.add(new_p)
|
|
created['profiles'] += 1
|
|
|
|
# Restore songs
|
|
songs = payload.get('songs', [])
|
|
for s in songs:
|
|
title = (s.get('title') or '').strip()
|
|
if not title:
|
|
continue
|
|
existing = db.query(Song).filter(Song.title == title).first()
|
|
lyrics = s.get('lyrics') or s.get('content') or ''
|
|
chords = s.get('chords') or ''
|
|
singer = s.get('singer') or s.get('artist') or ''
|
|
artist = s.get('artist') or ''
|
|
band = s.get('band') or ''
|
|
if existing:
|
|
changed = False
|
|
if lyrics and (existing.lyrics or '') != lyrics:
|
|
existing.lyrics = lyrics
|
|
changed = True
|
|
if chords and (existing.chords or '') != chords:
|
|
existing.chords = chords
|
|
changed = True
|
|
if singer and (existing.singer or '') != singer:
|
|
existing.singer = singer
|
|
changed = True
|
|
if artist and (existing.artist or '') != artist:
|
|
existing.artist = artist
|
|
changed = True
|
|
if band and (existing.band or '') != band:
|
|
existing.band = band
|
|
changed = True
|
|
if changed:
|
|
updated['songs'] += 1
|
|
else:
|
|
song_id = str(s.get('id')) if s.get('id') else str(uuid.uuid4())
|
|
new_s = Song(id=song_id, title=title, artist=artist, band=band, singer=singer, lyrics=lyrics, chords=chords)
|
|
db.add(new_s)
|
|
created['songs'] += 1
|
|
|
|
db.commit()
|
|
return jsonify({ 'ok': True, 'profiles_created': created['profiles'], 'profiles_updated': updated['profiles'], 'songs_created': created['songs'], 'songs_updated': updated['songs'], 'source': source_path })
|
|
except Exception as e:
|
|
db.rollback()
|
|
return jsonify({ 'ok': False, 'error': str(e) }), 500
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/upload_lyric', methods=['POST'])
|
|
@rate_limit(max_per_minute=10)
|
|
def upload_lyric():
|
|
"""Accept a lyric file and return extracted text. Does NOT create a Song automatically."""
|
|
if 'file' not in request.files:
|
|
return jsonify({'error':'file_missing'}), 400
|
|
f = request.files['file']
|
|
if not f.filename:
|
|
return jsonify({'error':'empty_filename'}), 400
|
|
|
|
# Sanitize filename to prevent path traversal attacks
|
|
safe_filename = sanitize_filename(f.filename)
|
|
|
|
# Validate file extension
|
|
allowed_extensions = {'.txt', '.docx', '.pdf', '.png', '.jpg', '.jpeg', '.tif', '.tiff'}
|
|
file_ext = os.path.splitext(safe_filename)[1].lower()
|
|
if file_ext not in allowed_extensions:
|
|
logger.warning(f'Invalid file upload attempt: {file_ext} from {request.remote_addr}')
|
|
return jsonify({'error':'invalid_file_type', 'message':'Only text, docx, pdf, and image files are allowed'}), 400
|
|
|
|
# Validate file size (max 10MB)
|
|
f.seek(0, 2) # Seek to end
|
|
file_size = f.tell()
|
|
f.seek(0) # Reset to beginning
|
|
if file_size > 10 * 1024 * 1024:
|
|
return jsonify({'error':'file_too_large', 'max_size': '10MB'}), 413
|
|
|
|
# Validate filename to prevent path traversal
|
|
if '..' in f.filename or '/' in f.filename or '\\' in f.filename:
|
|
return jsonify({'error':'invalid_filename'}), 400
|
|
|
|
# Optional metadata fields from form
|
|
title = (request.form.get('title') or f.filename.rsplit('.',1)[0])[:500] # Limit length
|
|
artist = (request.form.get('artist') or '')[:500]
|
|
band = (request.form.get('band') or '')[:500]
|
|
extracted = extract_text_from_file(f)
|
|
if not extracted:
|
|
return jsonify({'error':'extraction_failed','title':title}), 422
|
|
# Return without saving; front-end will present in modal for editing & saving
|
|
sample = extracted[:150] + ('...' if len(extracted) > 150 else '')
|
|
return jsonify({
|
|
'status':'ok',
|
|
'title': title,
|
|
'artist': artist,
|
|
'band': band,
|
|
'lyrics': extracted,
|
|
'preview': sample,
|
|
'length': len(extracted)
|
|
})
|
|
|
|
@app.route('/api/providers')
|
|
@rate_limit(max_per_minute=60)
|
|
def providers():
|
|
"""Report which external provider tokens are configured (OpenAI removed)."""
|
|
return jsonify({
|
|
'local_db': True,
|
|
'chartlyrics': True,
|
|
'lifeway_link': True
|
|
})
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return jsonify({'message': 'House of Prayer Song Lyrics API (Flask)', 'port': FLASK_PORT})
|
|
|
|
@app.route('/api/health')
|
|
def health():
|
|
return jsonify({'status': 'ok', 'ts': datetime.utcnow().isoformat()})
|
|
|
|
# Profile Selection Management
|
|
@app.route('/api/profile-selection/clear', methods=['POST', 'OPTIONS'])
|
|
def clear_profile_selection():
|
|
"""Clear profile selection (frontend manages this in localStorage, but endpoint needed for API compatibility)"""
|
|
if request.method == 'OPTIONS':
|
|
return '', 204
|
|
return jsonify({'status': 'ok', 'message': 'Profile selection cleared'})
|
|
|
|
# Profiles CRUD
|
|
@app.route('/api/profiles', methods=['GET','POST'])
|
|
@rate_limit(max_per_minute=600)
|
|
@require_auth
|
|
def profiles():
|
|
db = get_db()
|
|
try:
|
|
if request.method == 'GET':
|
|
# Cache profiles list for 5 minutes
|
|
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
|
|
cached = cache.get(cache_key)
|
|
if cached is not None:
|
|
return jsonify(cached)
|
|
|
|
items = db.query(Profile).all()
|
|
result = [serialize_profile(p, include_song_count=True, db=db) for p in items]
|
|
cache.set(cache_key, result, timeout=300)
|
|
return jsonify(result)
|
|
|
|
import uuid
|
|
data = request.get_json() or {}
|
|
|
|
# Validate name
|
|
profile_data = extract_profile_data(data)
|
|
if not profile_data['name']:
|
|
return validation_error('name', 'Name is required')
|
|
|
|
profile_id = data.get('id') or str(uuid.uuid4())
|
|
p = Profile(id=profile_id, **profile_data)
|
|
db.add(p)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Profile created: {profile_id} from {request.remote_addr}')
|
|
return jsonify(serialize_profile(p, include_song_count=True, db=db))
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in profiles endpoint: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/profiles/<pid>', methods=['PUT','DELETE'])
|
|
@rate_limit(max_per_minute=30)
|
|
@require_auth
|
|
@require_permission('edit')
|
|
def profile_item(pid):
|
|
# Validate ID
|
|
if not validate_id(pid):
|
|
return validation_error('id', 'Invalid profile ID format')
|
|
|
|
db = get_db()
|
|
try:
|
|
p = db.query(Profile).filter(Profile.id == pid).first()
|
|
if not p:
|
|
return not_found_response('Profile')
|
|
|
|
if request.method == 'PUT':
|
|
d = request.get_json() or {}
|
|
profile_data = extract_profile_data(d)
|
|
update_model_fields(p, profile_data)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Profile updated: {pid}')
|
|
return jsonify(serialize_profile(p, include_song_count=True, db=db))
|
|
|
|
# DELETE
|
|
db.delete(p)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'profiles_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Profile deleted: {pid}')
|
|
return success_response({'status': 'deleted'})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in profile_item: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# Songs CRUD + search (local)
|
|
@app.route('/api/songs', methods=['GET','POST'])
|
|
@rate_limit(max_per_minute=300)
|
|
@require_auth
|
|
def songs():
|
|
db = get_db()
|
|
try:
|
|
if request.method == 'GET':
|
|
q = (request.args.get('q','') or '')[:500]
|
|
items = search_songs(db, Song, q)
|
|
# Include lyrics preview (first 200 chars) for Database component
|
|
return jsonify([serialize_song(s, include_full_content=False) for s in items])
|
|
|
|
import uuid
|
|
d = request.get_json() or {}
|
|
|
|
# Validate and extract song data
|
|
song_data = extract_song_data(d)
|
|
if not song_data['title'].strip():
|
|
return validation_error('title', 'Title is required')
|
|
|
|
song_id = d.get('id') or str(uuid.uuid4())
|
|
s = Song(id=song_id, **song_data)
|
|
db.add(s)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'songs_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Song created: {song_id}')
|
|
return jsonify({'id': s.id})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in songs endpoint: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/songs/<sid>', methods=['GET','PUT','DELETE'])
|
|
@rate_limit(max_per_minute=30)
|
|
@require_auth
|
|
def song_item(sid):
|
|
# Validate ID
|
|
if not validate_id(sid):
|
|
return validation_error('id', 'Invalid song ID format')
|
|
|
|
db = get_db()
|
|
try:
|
|
s = db.query(Song).filter(Song.id == sid).first()
|
|
if not s:
|
|
return not_found_response('Song')
|
|
|
|
if request.method == 'GET':
|
|
# Detach from session to prevent lazy loading
|
|
db.expunge(s)
|
|
return jsonify(serialize_song(s, include_full_content=True))
|
|
|
|
if request.method == 'PUT':
|
|
d = request.get_json() or {}
|
|
song_data = extract_song_data(d)
|
|
update_model_fields(s, song_data)
|
|
s.updated_at = int(datetime.utcnow().timestamp())
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'songs_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Song updated: {sid}')
|
|
return success_response()
|
|
|
|
# DELETE
|
|
db.delete(s)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'songs_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Song deleted: {sid}')
|
|
return success_response({'status': 'deleted'})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in song_item: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# Planning (date-based)
|
|
@app.route('/api/plans', methods=['GET','POST'])
|
|
@rate_limit(max_per_minute=300)
|
|
@require_auth
|
|
def plans():
|
|
db = get_db()
|
|
try:
|
|
if request.method == 'GET':
|
|
# Cache plans list for 3 minutes (more volatile data)
|
|
cache_key = f'plans_list_{session.get("username", "anonymous")}'
|
|
cached = cache.get(cache_key)
|
|
if cached is not None:
|
|
return jsonify(cached)
|
|
|
|
items = db.query(Plan).all()
|
|
result = [serialize_plan(p) for p in items]
|
|
cache.set(cache_key, result, timeout=180)
|
|
return jsonify(result)
|
|
|
|
import uuid
|
|
d = request.get_json() or {}
|
|
plan_data = extract_plan_data(d)
|
|
plan_id = d.get('id') or str(uuid.uuid4())
|
|
|
|
# Validate date format (basic check)
|
|
if plan_data['date'] and len(plan_data['date']) > 50:
|
|
return validation_error('date', 'Invalid date format')
|
|
|
|
plan = Plan(id=plan_id, **plan_data)
|
|
db.add(plan)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'plans_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Plan created: {plan_id}')
|
|
return jsonify({'id': plan.id})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in plans endpoint: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/plans/<pid>', methods=['GET','PUT','DELETE'])
|
|
@rate_limit(max_per_minute=300)
|
|
@require_auth
|
|
def plan_detail(pid):
|
|
if not validate_id(pid):
|
|
return validation_error('id', 'Invalid plan ID format')
|
|
|
|
db = get_db()
|
|
try:
|
|
plan = db.query(Plan).filter(Plan.id == pid).first()
|
|
if not plan:
|
|
return not_found_response('Plan')
|
|
|
|
if request.method == 'GET':
|
|
return jsonify(serialize_plan(plan))
|
|
|
|
if request.method == 'PUT':
|
|
d = request.get_json() or {}
|
|
plan_data = extract_plan_data(d)
|
|
update_model_fields(plan, plan_data)
|
|
|
|
# Handle songs if provided
|
|
if 'songs' in d and isinstance(d['songs'], list):
|
|
# Delete existing plan songs
|
|
db.query(PlanSong).filter(PlanSong.plan_id==pid).delete()
|
|
|
|
# Add new songs
|
|
for idx, song in enumerate(d['songs']):
|
|
song_id = song.get('id') if isinstance(song, dict) else song
|
|
if song_id:
|
|
link = PlanSong(
|
|
id=str(uuid.uuid4()),
|
|
plan_id=pid,
|
|
song_id=str(song_id),
|
|
order_index=idx
|
|
)
|
|
db.add(link)
|
|
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'plans_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Plan updated: {pid}')
|
|
return jsonify(serialize_plan(plan))
|
|
|
|
# DELETE
|
|
# First delete all related plan songs
|
|
db.query(PlanSong).filter(PlanSong.plan_id==pid).delete()
|
|
db.delete(plan)
|
|
db.commit()
|
|
|
|
# Invalidate cache
|
|
cache_key = f'plans_list_{session.get("username", "anonymous")}'
|
|
cache.delete(cache_key)
|
|
|
|
logger.info(f'Plan deleted: {pid}')
|
|
return success_response({'status': 'deleted'})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in plan_detail: {e}')
|
|
return error_response('server_error', str(e), 500)
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/plans/<pid>/songs', methods=['GET','POST'])
|
|
@rate_limit(max_per_minute=60)
|
|
def plan_songs(pid):
|
|
if not pid or len(pid) > 255:
|
|
return jsonify({'error':'invalid_id'}), 400
|
|
|
|
db = get_db()
|
|
try:
|
|
plan = db.query(Plan).filter(Plan.id == pid).first()
|
|
if not plan:
|
|
return jsonify({'error':'plan_not_found'}), 404
|
|
|
|
if request.method == 'GET':
|
|
# Optimized: Use JOIN to fetch songs in single query (avoids N+1)
|
|
results = db.query(PlanSong, Song).\
|
|
join(Song, PlanSong.song_id == Song.id).\
|
|
filter(PlanSong.plan_id == pid).\
|
|
order_by(PlanSong.order_index).\
|
|
all()
|
|
|
|
return jsonify([{
|
|
'id': plan_song.id,
|
|
'song_id': song.id,
|
|
'order_index': plan_song.order_index,
|
|
'song': {
|
|
'id': song.id,
|
|
'title': song.title,
|
|
'artist': song.artist or '',
|
|
'band': song.band or '',
|
|
'singer': song.singer or ''
|
|
}
|
|
} for plan_song, song in results])
|
|
|
|
d = request.get_json() or {}
|
|
song_id = d.get('song_id')
|
|
if not song_id:
|
|
return jsonify({'error': 'song_id_required'}), 400
|
|
|
|
order_index = int(d.get('order_index') or 0)
|
|
link_id = str(uuid.uuid4())
|
|
link = PlanSong(id=link_id, plan_id=pid, song_id=song_id, order_index=order_index)
|
|
db.add(link)
|
|
db.commit()
|
|
|
|
# Clear cache after add
|
|
# cache.delete_memoized(plan_songs, pid)
|
|
|
|
return jsonify({'id':link.id})
|
|
except Exception as e:
|
|
db.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# Profile Songs endpoints
|
|
@app.route('/api/profiles/<pid>/songs', methods=['GET','POST'])
|
|
@rate_limit(max_per_minute=300)
|
|
def profile_songs(pid):
|
|
import uuid
|
|
|
|
# Validate profile ID
|
|
if not pid or len(pid) > 255:
|
|
return jsonify({'error':'invalid_profile_id'}), 400
|
|
|
|
db = get_db()
|
|
try:
|
|
profile = db.query(Profile).filter(Profile.id == pid).first()
|
|
if not profile:
|
|
return jsonify({'error':'profile_not_found'}), 404
|
|
|
|
if request.method == 'GET':
|
|
try:
|
|
# Optimized: Get all profile songs with full song data in fewer queries
|
|
links = db.query(ProfileSong).filter(ProfileSong.profile_id==pid).all()
|
|
|
|
# Get all song IDs at once
|
|
song_ids = [l.song_id for l in links]
|
|
if not song_ids:
|
|
return jsonify([])
|
|
|
|
# Fetch all songs in one query
|
|
songs = db.query(Song).filter(Song.id.in_(song_ids)).all()
|
|
songs_dict = {s.id: s for s in songs}
|
|
|
|
# Fetch all custom keys in one query
|
|
keys = db.query(ProfileSongKey).filter(
|
|
ProfileSongKey.profile_id==pid,
|
|
ProfileSongKey.song_id.in_(song_ids)
|
|
).all()
|
|
keys_dict = {k.song_id: k.song_key for k in keys}
|
|
|
|
# Build result with full song data
|
|
result = []
|
|
for l in links:
|
|
song = songs_dict.get(l.song_id)
|
|
if not song:
|
|
continue
|
|
|
|
song_key = keys_dict.get(l.song_id, profile.default_key if profile.default_key else 'C')
|
|
result.append({
|
|
'id': song.id,
|
|
'title': song.title or '',
|
|
'artist': song.artist or '',
|
|
'band': song.band or '',
|
|
'singer': song.singer or '',
|
|
'lyrics': song.lyrics or '',
|
|
'chords': song.chords or '',
|
|
'memo': song.memo or '',
|
|
'created_at': song.created_at if hasattr(song, 'created_at') else None,
|
|
'updated_at': song.updated_at if hasattr(song, 'updated_at') else None,
|
|
'song_key': song_key,
|
|
'profile_song_id': l.id
|
|
})
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f'Error loading profile songs for {pid}: {str(e)}')
|
|
return jsonify({'error': 'failed_to_load_songs', 'message': str(e)}), 500
|
|
d = request.get_json() or {}
|
|
song_id = d.get('song_id')
|
|
|
|
# Validate song_id
|
|
if not song_id:
|
|
return jsonify({'error': 'song_id_required'}), 400
|
|
|
|
# Check if song exists
|
|
song = db.query(Song).filter(Song.id == song_id).first()
|
|
if not song:
|
|
return jsonify({'error': 'song_not_found'}), 404
|
|
|
|
# Check if association already exists
|
|
existing = db.query(ProfileSong).filter(
|
|
ProfileSong.profile_id==pid,
|
|
ProfileSong.song_id==song_id
|
|
).first()
|
|
if existing:
|
|
return jsonify({'id':existing.id, 'exists':True})
|
|
|
|
# Create profile-song association
|
|
link_id = str(uuid.uuid4())
|
|
link = ProfileSong(id=link_id, profile_id=pid, song_id=song_id)
|
|
db.add(link)
|
|
|
|
# Save custom key if provided
|
|
song_key = d.get('song_key')
|
|
if song_key:
|
|
key_id = str(uuid.uuid4())
|
|
key_record = ProfileSongKey(id=key_id, profile_id=pid, song_id=song_id, song_key=song_key)
|
|
db.add(key_record)
|
|
|
|
db.commit()
|
|
logger.info(f'Song {song_id} added to profile {pid}')
|
|
return jsonify({'id':link.id, 'success': True})
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f'Error in profile_songs for {pid}: {str(e)}')
|
|
return jsonify({'error': 'operation_failed', 'message': str(e)}), 500
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
@app.route('/api/profiles/<pid>/songs/<sid>', methods=['GET','PUT','DELETE'])
|
|
@rate_limit(max_per_minute=300)
|
|
def profile_song_item(pid, sid):
|
|
import uuid
|
|
db = get_db()
|
|
try:
|
|
link = db.query(ProfileSong).filter(
|
|
ProfileSong.profile_id==pid,
|
|
ProfileSong.song_id==sid
|
|
).first()
|
|
if not link:
|
|
return jsonify({'error':'not_found'}),404
|
|
if request.method == 'GET':
|
|
# Get custom key if exists
|
|
key_record = db.query(ProfileSongKey).filter(
|
|
ProfileSongKey.profile_id==pid,
|
|
ProfileSongKey.song_id==sid
|
|
).first()
|
|
profile = db.query(Profile).filter(Profile.id == pid).first()
|
|
song_key = key_record.song_key if key_record else (profile.default_key if profile else 'C')
|
|
return jsonify({
|
|
'id': link.id,
|
|
'profile_id': link.profile_id,
|
|
'song_id': link.song_id,
|
|
'song_key': song_key
|
|
})
|
|
if request.method == 'PUT':
|
|
d = request.get_json() or {}
|
|
new_key = d.get('song_key')
|
|
if new_key:
|
|
# Update or create key record
|
|
key_record = db.query(ProfileSongKey).filter(
|
|
ProfileSongKey.profile_id==pid,
|
|
ProfileSongKey.song_id==sid
|
|
).first()
|
|
if key_record:
|
|
key_record.song_key = new_key
|
|
else:
|
|
key_id = str(uuid.uuid4())
|
|
key_record = ProfileSongKey(id=key_id, profile_id=pid, song_id=sid, song_key=new_key)
|
|
db.add(key_record)
|
|
db.commit()
|
|
return jsonify({'status':'ok'})
|
|
# DELETE - remove both records
|
|
db.delete(link)
|
|
key_record = db.query(ProfileSongKey).filter(
|
|
ProfileSongKey.profile_id==pid,
|
|
ProfileSongKey.song_id==sid
|
|
).first()
|
|
if key_record:
|
|
db.delete(key_record)
|
|
db.commit()
|
|
return jsonify({'status':'deleted'})
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# External search using ChartLyrics API + Local Database
|
|
@app.route('/api/search_external')
|
|
@rate_limit(max_per_minute=20)
|
|
def search_external():
|
|
import urllib.request
|
|
import urllib.parse
|
|
import xml.etree.ElementTree as ET
|
|
|
|
q = (request.args.get('q', '') or '')[:500].strip() # Limit query length
|
|
filter_type = request.args.get('filter', 'all')
|
|
|
|
# Validate filter type
|
|
if filter_type not in ['all', 'title', 'artist', 'band']:
|
|
filter_type = 'all'
|
|
|
|
if not q:
|
|
return jsonify({'query': q, 'results': []})
|
|
|
|
results = []
|
|
|
|
# First, search local database
|
|
db = get_db()
|
|
try:
|
|
local_songs = db.query(Song).all()
|
|
|
|
def match_local(s):
|
|
q_lower = q.lower()
|
|
if filter_type == 'title':
|
|
return q_lower in (s.title or '').lower()
|
|
elif filter_type == 'artist':
|
|
return q_lower in (s.artist or '').lower()
|
|
elif filter_type == 'band':
|
|
return q_lower in (s.band or '').lower()
|
|
else: # all
|
|
return (q_lower in (s.title or '').lower() or
|
|
q_lower in (s.artist or '').lower() or
|
|
q_lower in (s.band or '').lower())
|
|
|
|
for s in local_songs:
|
|
if match_local(s):
|
|
snippet = (s.lyrics or '')[:150] + '...' if len(s.lyrics or '') > 150 else (s.lyrics or '')
|
|
results.append({
|
|
'title': s.title,
|
|
'artist': s.artist,
|
|
'band': s.band,
|
|
'snippet': snippet,
|
|
'source': 'local_db'
|
|
})
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# Then search ChartLyrics (free API)
|
|
try:
|
|
# ChartLyrics search - expects artist and song
|
|
# We'll try different combinations based on filter
|
|
search_terms = []
|
|
|
|
if filter_type == 'artist':
|
|
search_terms.append({'artist': q, 'song': ''})
|
|
elif filter_type == 'title':
|
|
search_terms.append({'artist': '', 'song': q})
|
|
else:
|
|
# For 'all' and 'band', try both
|
|
search_terms.append({'artist': q, 'song': ''})
|
|
search_terms.append({'artist': '', 'song': q})
|
|
|
|
for term in search_terms:
|
|
try:
|
|
artist_encoded = urllib.parse.quote(term['artist'])
|
|
song_encoded = urllib.parse.quote(term['song'])
|
|
url = f'http://api.chartlyrics.com/apiv1.asmx/SearchLyric?artist={artist_encoded}&song={song_encoded}'
|
|
|
|
with urllib.request.urlopen(url, timeout=5) as response:
|
|
xml_data = response.read()
|
|
root = ET.fromstring(xml_data)
|
|
|
|
# Parse XML results (ChartLyrics returns XML)
|
|
namespace = {'cl': 'http://api.chartlyrics.com/'}
|
|
for item in root.findall('.//cl:SearchLyricResult', namespace):
|
|
title = item.find('cl:Song', namespace)
|
|
artist = item.find('cl:Artist', namespace)
|
|
lyric_id = item.find('cl:LyricId', namespace)
|
|
|
|
if title is not None and artist is not None:
|
|
# Get actual lyrics
|
|
snippet = "Preview available - click to view full lyrics"
|
|
|
|
results.append({
|
|
'title': title.text or 'Unknown',
|
|
'artist': artist.text or 'Unknown',
|
|
'band': artist.text or 'Unknown',
|
|
'snippet': snippet,
|
|
'source': 'chartlyrics',
|
|
'lyric_id': lyric_id.text if lyric_id is not None else None
|
|
})
|
|
|
|
# Limit to 5 results per search
|
|
if len([r for r in results if r.get('source') == 'chartlyrics']) >= 5:
|
|
break
|
|
except:
|
|
continue
|
|
|
|
except Exception as e:
|
|
# If external API fails, we still have local results
|
|
pass
|
|
|
|
# (Genius integration removed per user request)
|
|
|
|
# Remove duplicates and limit total results
|
|
seen = set()
|
|
unique_results = []
|
|
for r in results:
|
|
key = (r['title'].lower(), r['artist'].lower())
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique_results.append(r)
|
|
if len(unique_results) >= 10:
|
|
break
|
|
|
|
# Add Lifeway Worship search link as an additional source entry
|
|
try:
|
|
import urllib.parse as _urlparse
|
|
lifeway_url = f"https://worship.lifeway.com/findAndBuy/home?findMappable=false&searchString={_urlparse.quote(q)}"
|
|
unique_results.append({
|
|
'title': f'Search on Lifeway: {q}',
|
|
'artist': '',
|
|
'band': 'Lifeway Worship',
|
|
'snippet': 'Open Lifeway Worship results for this query',
|
|
'source': 'lifeway',
|
|
'url': lifeway_url
|
|
})
|
|
except Exception:
|
|
pass
|
|
|
|
return jsonify({
|
|
'query': q,
|
|
'filter': filter_type,
|
|
'results': unique_results,
|
|
'sources': 'Local DB + ChartLyrics (+ Lifeway link)'
|
|
})
|
|
|
|
@app.route('/api/export/<int:plan_id>')
|
|
@rate_limit(max_per_minute=10)
|
|
def export_plan(plan_id):
|
|
"""Export a plan's songs as a formatted text document."""
|
|
if plan_id < 0:
|
|
return jsonify({'error':'invalid_plan_id'}), 400
|
|
|
|
db = get_db()
|
|
try:
|
|
plan = db.query(Plan).filter(Plan.id == plan_id).first()
|
|
if not plan:
|
|
return jsonify({'error':'plan_not_found'}), 404
|
|
|
|
profile = db.query(Profile).get(plan.profile_id) if plan.profile_id else None
|
|
plan_songs = db.query(PlanSong).filter(PlanSong.plan_id==plan_id).order_by(PlanSong.order_index).all()
|
|
|
|
lines = []
|
|
lines.append('=' * 60)
|
|
lines.append(f'WORSHIP SETLIST: {plan.date}')
|
|
if profile:
|
|
lines.append(f'Profile: {profile.name} (Key: {profile.default_key})')
|
|
if plan.notes:
|
|
lines.append(f'Notes: {plan.notes}')
|
|
lines.append('=' * 60)
|
|
lines.append('')
|
|
|
|
for idx, ps in enumerate(plan_songs, 1):
|
|
song = db.query(Song).filter(Song.id == ps.song_id).first()
|
|
if not song:
|
|
continue
|
|
lines.append(f'{idx}. {song.title}')
|
|
lines.append(f' Artist: {song.artist or "Unknown"} | Band: {song.band or "Unknown"}')
|
|
lines.append('')
|
|
if song.lyrics:
|
|
lines.append(' LYRICS:')
|
|
for line in song.lyrics.splitlines():
|
|
lines.append(f' {line}')
|
|
lines.append('')
|
|
if song.chords:
|
|
lines.append(' CHORDS:')
|
|
for line in song.chords.splitlines():
|
|
lines.append(f' {line}')
|
|
lines.append('')
|
|
lines.append('-' * 60)
|
|
lines.append('')
|
|
|
|
content = '\n'.join(lines)
|
|
response = app.make_response(content)
|
|
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
|
|
response.headers['Content-Disposition'] = f'attachment; filename="setlist_{plan.date}.txt"'
|
|
return response
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
pass # Session cleanup handled by teardown_appcontext
|
|
|
|
# ==============================================================================
|
|
# USER MANAGEMENT ENDPOINTS
|
|
# ==============================================================================
|
|
|
|
@app.route('/api/auth/login', methods=['POST'])
|
|
@rate_limit(max_per_minute=5) # Stricter rate limit for login
|
|
def login():
|
|
"""Authenticate user with username and password"""
|
|
data = request.get_json() or {}
|
|
username = sanitize_text(data.get('username', ''), 255).strip()
|
|
password = data.get('password', '').strip()
|
|
|
|
if not username or not password:
|
|
return jsonify({'success': False, 'error': 'username_password_required'}), 400
|
|
|
|
# Rate limiting: track failed attempts
|
|
if len(password) > 128: # Prevent DoS via long passwords
|
|
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
|
|
|
|
db = get_db()
|
|
try:
|
|
# Query User table
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
if not user:
|
|
# Constant-time response to prevent user enumeration
|
|
# Still hash a dummy password to prevent timing attacks
|
|
import bcrypt
|
|
bcrypt.checkpw(b'dummy', bcrypt.gensalt())
|
|
logger.warning(f'Failed login attempt for non-existent user: {username} from {request.remote_addr}')
|
|
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
|
|
|
|
# Check if user is active
|
|
if not user.active:
|
|
logger.warning(f'Login attempt for disabled account: {username} from {request.remote_addr}')
|
|
return jsonify({'success': False, 'error': 'account_disabled'}), 403
|
|
|
|
# Verify password
|
|
if user.check_password(password):
|
|
# SECURITY: Regenerate session ID to prevent session fixation
|
|
old_session_data = dict(session)
|
|
session.clear()
|
|
|
|
# Update last login
|
|
user.last_login = datetime.now()
|
|
db.commit()
|
|
|
|
# Set new session with regenerated ID
|
|
session['username'] = user.username
|
|
session['role'] = user.role
|
|
session['permissions'] = user.get_permissions_list()
|
|
session['login_time'] = datetime.now().isoformat()
|
|
session.permanent = True
|
|
|
|
logger.info(f'Successful login for {username} from {request.remote_addr}')
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'username': user.username,
|
|
'role': user.role,
|
|
'permissions': user.get_permissions_list()
|
|
})
|
|
else:
|
|
logger.warning(f'Failed login attempt for {username} from {request.remote_addr}')
|
|
return jsonify({'success': False, 'error': 'invalid_credentials'}), 401
|
|
|
|
except Exception as e:
|
|
logger.error(f"Login error: {e}")
|
|
return jsonify({'success': False, 'error': 'server_error'}), 500
|
|
|
|
@app.route('/api/auth/logout', methods=['POST'])
|
|
@rate_limit(max_per_minute=30)
|
|
def logout():
|
|
"""Logout current user"""
|
|
session.clear()
|
|
return jsonify({'success': True})
|
|
|
|
@app.route('/api/users', methods=['GET', 'POST'])
|
|
@rate_limit(max_per_minute=30)
|
|
@require_auth
|
|
@require_permission('settings')
|
|
def users():
|
|
"""Get all users or create a new user"""
|
|
db = get_db()
|
|
try:
|
|
if request.method == 'GET':
|
|
# Check if requester is admin or has settings permission
|
|
username = session.get('username')
|
|
if not username:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Not logged in'}), 401
|
|
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user or (user.role != 'admin' and not user.has_permission('settings')):
|
|
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
|
|
|
|
# Get all users
|
|
all_users = db.query(User).all()
|
|
return jsonify([{
|
|
'id': u.id,
|
|
'username': u.username,
|
|
'role': u.role,
|
|
'permissions': u.get_permissions_list(),
|
|
'active': u.active,
|
|
'created_at': u.created_at.isoformat() if u.created_at else None,
|
|
'last_login': u.last_login.isoformat() if u.last_login else None
|
|
} for u in all_users])
|
|
|
|
# POST - Create new user
|
|
# Check if requester is admin or has settings permission
|
|
username_auth = session.get('username')
|
|
if not username_auth:
|
|
return jsonify({'error': 'unauthorized', 'message': 'Not logged in'}), 401
|
|
|
|
user_auth = db.query(User).filter(User.username == username_auth).first()
|
|
if not user_auth or (user_auth.role != 'admin' and not user_auth.has_permission('settings')):
|
|
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
|
|
|
|
data = request.get_json() or {}
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '').strip()
|
|
role = data.get('role', 'viewer').strip()
|
|
permissions = data.get('permissions', ['view'])
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'username_password_required'}), 400
|
|
|
|
# Check if username exists
|
|
existing = db.query(User).filter(User.username == username).first()
|
|
if existing:
|
|
return jsonify({'error': 'username_exists'}), 409
|
|
|
|
# Create user
|
|
new_user = User(
|
|
username=username,
|
|
role=role,
|
|
permissions=','.join(permissions) if isinstance(permissions, list) else permissions,
|
|
active=True
|
|
)
|
|
new_user.set_password(password)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'id': new_user.id,
|
|
'username': new_user.username,
|
|
'role': new_user.role,
|
|
'permissions': new_user.get_permissions_list(),
|
|
'active': new_user.active
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"User management error: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
pass
|
|
|
|
@app.route('/api/users/<user_id>', methods=['GET', 'PUT', 'DELETE'])
|
|
@rate_limit(max_per_minute=30)
|
|
@require_auth
|
|
@require_permission('settings')
|
|
def user_detail(user_id):
|
|
"""Get, update, or delete a specific user"""
|
|
db = get_db()
|
|
try:
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
return jsonify({'error': 'user_not_found'}), 404
|
|
|
|
if request.method == 'GET':
|
|
return jsonify({
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'role': user.role,
|
|
'permissions': user.get_permissions_list(),
|
|
'active': user.active,
|
|
'created_at': user.created_at.isoformat() if user.created_at else None,
|
|
'last_login': user.last_login.isoformat() if user.last_login else None
|
|
})
|
|
|
|
# Check admin access for PUT/DELETE
|
|
auth_user = session.get('username')
|
|
auth_user_obj = db.query(User).filter(User.username == auth_user).first()
|
|
if not auth_user_obj or (auth_user_obj.role != 'admin' and not auth_user_obj.has_permission('settings')):
|
|
return jsonify({'error': 'admin_only', 'message': 'Admin access or Settings permission required'}), 403
|
|
|
|
if request.method == 'PUT':
|
|
data = request.get_json() or {}
|
|
|
|
# Update username if provided and different
|
|
if 'username' in data and data['username'] and data['username'] != user.username:
|
|
new_username = data['username'].strip()
|
|
# Check if new username already exists
|
|
existing = db.query(User).filter(User.username == new_username, User.id != user_id).first()
|
|
if existing:
|
|
return jsonify({'success': False, 'error': 'username_already_exists'}), 409
|
|
|
|
# Update biometric credentials to point to new username
|
|
from postgresql_models import BiometricCredential
|
|
biometric_creds = db.query(BiometricCredential).filter(BiometricCredential.user_id == user.id).all()
|
|
for cred in biometric_creds:
|
|
cred.username = new_username
|
|
|
|
user.username = new_username
|
|
|
|
# Update password if provided
|
|
if 'password' in data and data['password']:
|
|
user.set_password(data['password'])
|
|
|
|
# Update role if provided
|
|
if 'role' in data:
|
|
user.role = data['role']
|
|
|
|
# Update permissions if provided
|
|
if 'permissions' in data:
|
|
perms = data['permissions']
|
|
user.permissions = ','.join(perms) if isinstance(perms, list) else perms
|
|
|
|
# Update active status if provided
|
|
if 'active' in data:
|
|
user.active = bool(data['active'])
|
|
|
|
user.updated_at = datetime.now()
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'role': user.role,
|
|
'permissions': user.get_permissions_list(),
|
|
'active': user.active
|
|
})
|
|
|
|
# DELETE
|
|
db.delete(user)
|
|
db.commit()
|
|
return jsonify({'success': True, 'status': 'deleted'})
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"User detail error: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
finally:
|
|
pass
|
|
|
|
@app.route('/api/auth/check', methods=['GET'])
|
|
@rate_limit(max_per_minute=60)
|
|
def check_auth():
|
|
"""Check if current session is authenticated and return user info"""
|
|
username = session.get('username')
|
|
role = session.get('role')
|
|
|
|
if not username:
|
|
return jsonify({'authenticated': False}), 401
|
|
|
|
# Validate session hasn't expired (additional check)
|
|
login_time_str = session.get('login_time')
|
|
if login_time_str:
|
|
from datetime import datetime, timedelta
|
|
try:
|
|
login_time = datetime.fromisoformat(login_time_str)
|
|
if datetime.now() - login_time > timedelta(hours=8):
|
|
session.clear()
|
|
return jsonify({'authenticated': False, 'reason': 'session_expired'}), 401
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
db = get_db()
|
|
try:
|
|
# Check User table first
|
|
user = db.query(User).filter(User.username == username).first()
|
|
|
|
if user:
|
|
# User found in database
|
|
if not user.active:
|
|
session.clear()
|
|
return jsonify({'authenticated': False}), 401
|
|
|
|
# Update session with latest user data
|
|
session['role'] = user.role
|
|
session['permissions'] = user.get_permissions_list()
|
|
|
|
return jsonify({
|
|
'authenticated': True,
|
|
'username': user.username,
|
|
'role': user.role,
|
|
'permissions': user.get_permissions_list()
|
|
})
|
|
else:
|
|
# Fallback: session exists but user not in database yet
|
|
# This handles legacy logins - return session data
|
|
if role:
|
|
return jsonify({
|
|
'authenticated': True,
|
|
'username': username,
|
|
'role': role,
|
|
'permissions': session.get('permissions', ['view'])
|
|
})
|
|
else:
|
|
# Invalid session
|
|
session.clear()
|
|
return jsonify({'authenticated': False}), 401
|
|
except Exception as e:
|
|
logger.error(f"Auth check error: {e}")
|
|
return jsonify({'authenticated': False}), 500
|
|
|
|
# ==============================================================================
|
|
# BIOMETRIC AUTHENTICATION ENDPOINTS
|
|
# ==============================================================================
|
|
|
|
@app.route('/api/biometric/register', methods=['POST'])
|
|
@rate_limit(max_per_minute=10)
|
|
def register_biometric():
|
|
"""Register biometric for a user"""
|
|
try:
|
|
data = request.json
|
|
username = data.get('username')
|
|
device_name = data.get('deviceName', 'Unknown Device')
|
|
device_fingerprint = data.get('deviceFingerprint')
|
|
device_info = data.get('deviceInfo', '')
|
|
|
|
if not all([username, device_fingerprint]):
|
|
return jsonify({'error': 'Missing username or device fingerprint'}), 400
|
|
|
|
result = register_biometric_credential(
|
|
username, device_name, device_fingerprint, device_info
|
|
)
|
|
|
|
if result['success']:
|
|
return jsonify(result)
|
|
else:
|
|
return jsonify(result), 400
|
|
except Exception as e:
|
|
logger.error(f"Error in biometric registration: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/biometric/authenticate', methods=['POST'])
|
|
@rate_limit(max_per_minute=30)
|
|
def authenticate_biometric_endpoint():
|
|
"""Authenticate using biometric - device already verified fingerprint"""
|
|
try:
|
|
data = request.json
|
|
username = data.get('username') # Optional - can lookup by device fingerprint alone
|
|
device_fingerprint = data.get('deviceFingerprint')
|
|
|
|
logger.info(f"Biometric auth attempt for {username or 'device lookup'} from device {device_fingerprint}")
|
|
|
|
if not device_fingerprint:
|
|
logger.error("Missing device fingerprint")
|
|
return jsonify({'error': 'Device fingerprint required'}), 400
|
|
|
|
result = authenticate_biometric_by_device(username, device_fingerprint)
|
|
|
|
if result['success']:
|
|
# Set Flask session - use username from result (may have been looked up by device)
|
|
try:
|
|
username = result['username']
|
|
session['authenticated'] = True
|
|
session['username'] = username
|
|
session['role'] = result['role']
|
|
session['user_id'] = username # Could get actual user_id if needed
|
|
session['auth_time'] = datetime.now().isoformat()
|
|
|
|
logger.info(f"Biometric auth successful for {username}, role: {result['role']}")
|
|
except Exception as session_err:
|
|
logger.error(f"Error setting session: {session_err}")
|
|
|
|
return jsonify(result)
|
|
else:
|
|
logger.warning(f"Biometric auth failed for {username}: {result.get('error')}")
|
|
return jsonify(result), 401
|
|
except Exception as e:
|
|
logger.error(f"Error in biometric authentication: {e}", exc_info=True)
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/biometric/status/<username>', methods=['GET'])
|
|
@rate_limit(max_per_minute=30)
|
|
def get_biometric_status(username):
|
|
"""Get biometric status for a user"""
|
|
try:
|
|
result = get_user_biometric_status(username)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error getting biometric status: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
result = get_user_credentials(username)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching credentials: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/biometric/credentials/<credential_id>', methods=['PUT'])
|
|
@rate_limit(max_per_minute=30)
|
|
def toggle_biometric_credential(credential_id):
|
|
"""Enable or disable a biometric credential"""
|
|
try:
|
|
data = request.json
|
|
enabled = data.get('enabled', True)
|
|
|
|
result = toggle_credential(credential_id, enabled)
|
|
|
|
if result['success']:
|
|
return jsonify(result)
|
|
else:
|
|
return jsonify(result), 404
|
|
except Exception as e:
|
|
logger.error(f"Error toggling credential: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/biometric/credentials/<credential_id>', methods=['DELETE'])
|
|
@rate_limit(max_per_minute=30)
|
|
def delete_biometric_credential(credential_id):
|
|
"""Delete a biometric credential"""
|
|
try:
|
|
result = delete_credential(credential_id)
|
|
|
|
if result['success']:
|
|
return jsonify(result)
|
|
else:
|
|
return jsonify(result), 404
|
|
except Exception as e:
|
|
logger.error(f"Error deleting credential: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/api/biometric/remove/<username>', methods=['DELETE'])
|
|
@rate_limit(max_per_minute=30)
|
|
def remove_user_biometric(username):
|
|
"""Remove all biometric credentials for a user"""
|
|
try:
|
|
result = delete_all_user_biometrics(username)
|
|
|
|
if result['success']:
|
|
return jsonify(result)
|
|
else:
|
|
return jsonify(result), 500
|
|
except Exception as e:
|
|
logger.error(f"Error removing biometric: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
if __name__ == '__main__':
|
|
# Production mode: disable debug
|
|
app.run(host='0.0.0.0', port=FLASK_PORT, debug=False)
|