Files
Church-Music/legacy-site/backend/helpers.py

231 lines
7.2 KiB
Python

"""
Database and utility helper functions for Flask app
Extracts common patterns to reduce code duplication
"""
from flask import jsonify
from functools import wraps
import re
import logging
logger = logging.getLogger(__name__)
# ============================================================================
# Response Helpers
# ============================================================================
def success_response(data=None, status=200):
"""Standard success response"""
if data is None:
data = {'status': 'ok'}
return jsonify(data), status
def error_response(error_code, message=None, status=400):
"""Standard error response"""
response = {'error': error_code}
if message:
response['message'] = message
return jsonify(response), status
def not_found_response(resource='Resource'):
"""Standard 404 response"""
return error_response('not_found', f'{resource} not found', 404)
def validation_error(field, message='Invalid or missing field'):
"""Standard validation error"""
return error_response('validation_error', f'{field}: {message}', 400)
# ============================================================================
# Input Sanitization
# ============================================================================
def sanitize_text(text, max_length=None, remove_scripts=True):
"""Sanitize text input by removing scripts and limiting length"""
if not text:
return ''
text = str(text).strip()
if remove_scripts:
text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.IGNORECASE | re.DOTALL)
if max_length:
text = text[:max_length]
return text
def validate_id(id_value, max_length=255):
"""Validate ID format and length"""
if not id_value:
return False
return len(str(id_value)) <= max_length
# ============================================================================
# Database Helpers
# ============================================================================
def get_or_404(query, error_msg='Resource not found'):
"""Get item from query or return 404"""
item = query.first()
if not item:
raise NotFoundError(error_msg)
return item
class NotFoundError(Exception):
"""Exception for 404 cases"""
pass
def safe_db_operation(db, operation_func):
"""
Safely execute database operation with automatic rollback on error
Args:
db: Database session
operation_func: Function that performs DB operations
Returns:
Tuple of (success: bool, result: any, error: str or None)
"""
try:
result = operation_func()
db.commit()
return True, result, None
except NotFoundError as e:
db.rollback()
return False, None, str(e)
except Exception as e:
db.rollback()
logger.error(f"Database operation failed: {e}")
return False, None, str(e)
# ============================================================================
# Model Serialization
# ============================================================================
def serialize_profile(profile, include_song_count=False, db=None):
"""Serialize Profile model to dict"""
data = {
'id': profile.id,
'name': profile.name,
'first_name': profile.first_name,
'last_name': profile.last_name,
'default_key': profile.default_key,
'email': profile.email or '',
'contact_number': profile.contact_number or '',
'notes': profile.notes or ''
}
if include_song_count and db:
from postgresql_models import ProfileSong
data['song_count'] = db.query(ProfileSong).filter(
ProfileSong.profile_id == profile.id
).count()
return data
def serialize_song(song, include_full_content=False):
"""Serialize Song model to dict"""
data = {
'id': song.id,
'title': song.title,
'artist': song.artist,
'band': song.band,
'singer': song.singer
}
if include_full_content:
data['lyrics'] = song.lyrics or ''
data['chords'] = song.chords or ''
else:
# Preview only
data['lyrics'] = (song.lyrics or '')[:200] if song.lyrics else ''
data['chords'] = (song.chords or '')[:100] if song.chords else ''
return data
def serialize_plan(plan):
"""Serialize Plan model to dict"""
return {
'id': plan.id,
'date': plan.date,
'profile_id': plan.profile_id,
'notes': plan.notes or ''
}
# ============================================================================
# Data Extraction and Validation
# ============================================================================
def extract_profile_data(data):
"""Extract and validate profile data from request"""
return {
'name': sanitize_text(data.get('name'), 255),
'first_name': sanitize_text(data.get('first_name'), 255),
'last_name': sanitize_text(data.get('last_name'), 255),
'default_key': sanitize_text(data.get('default_key', 'C'), 10),
'email': sanitize_text(data.get('email'), 255),
'contact_number': sanitize_text(data.get('contact_number'), 50),
'notes': sanitize_text(data.get('notes'), 5000)
}
def extract_song_data(data):
"""Extract and validate song data from request"""
return {
'title': sanitize_text(data.get('title', 'Untitled'), 500),
'artist': sanitize_text(data.get('artist'), 500),
'band': sanitize_text(data.get('band'), 500),
'singer': sanitize_text(data.get('singer'), 500),
'lyrics': data.get('lyrics', ''),
'chords': data.get('chords', '')
}
def extract_plan_data(data):
"""Extract and validate plan data from request"""
return {
'date': sanitize_text(data.get('date'), 50),
'profile_id': data.get('profile_id'),
'notes': sanitize_text(data.get('notes'), 5000)
}
# ============================================================================
# Query Helpers
# ============================================================================
def search_songs(db, Song, query_string=''):
"""Search songs by query string - SQL injection safe"""
items = db.query(Song).all()
if not query_string:
return items
# Sanitize and limit query length
q = str(query_string)[:500].lower().strip()
# Remove any SQL-like characters that could be injection attempts
q = re.sub(r'[;\\"\']', '', q)
def matches(song):
searchable = [
song.title or '',
song.artist or '',
song.band or '',
song.singer or ''
]
return any(q in field.lower() for field in searchable)
return [s for s in items if matches(s)]
def update_model_fields(model, data, field_mapping=None):
"""
Update model fields from data dict
Args:
model: SQLAlchemy model instance
data: Dict with new values
field_mapping: Optional dict mapping data keys to model attributes
"""
if field_mapping is None:
field_mapping = {k: k for k in data.keys()}
for data_key, model_attr in field_mapping.items():
if data_key in data:
setattr(model, model_attr, data[data_key])