Files

576 lines
24 KiB
Python

from flask import Flask, jsonify, request
from flask_cors import CORS
from datetime import datetime
from models import init_db, SessionLocal, Profile, Song, Plan, PlanSong, ProfileSong
try:
from dotenv import load_dotenv
load_dotenv() # Loads variables from a .env file if present
except ImportError:
# dotenv not installed yet; proceed without loading .env
pass
import os
import json
app = Flask(__name__)
# Enhanced CORS with wildcard support for all origins
CORS(app, resources={
r"/api/*": {
"origins": "*",
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
init_db()
FLASK_PORT = int(os.environ.get('FLASK_PORT', '5000'))
def get_db():
"""Create a new database session with automatic cleanup"""
db = SessionLocal()
try:
return db
except Exception:
db.close()
raise
@app.teardown_appcontext
def cleanup_session(exception=None):
"""Cleanup scoped database sessions after each request to prevent connection pool exhaustion"""
SessionLocal.remove()
def extract_text_from_file(file_storage):
"""Extract text from an uploaded file (docx, pdf, image, txt)."""
filename = file_storage.filename or ''
lower = filename.lower()
data = file_storage.read()
# Reset stream position if needed for certain libraries
import io
stream = io.BytesIO(data)
text = ''
try:
if lower.endswith('.txt'):
text = data.decode(errors='ignore')
elif lower.endswith('.docx'):
try:
import docx
doc = docx.Document(stream)
text = '\n'.join(p.text for p in doc.paragraphs)
except Exception:
text = ''
elif lower.endswith('.pdf'):
try:
import PyPDF2
reader = PyPDF2.PdfReader(stream)
parts = []
for page in reader.pages[:20]: # safety limit
try:
parts.append(page.extract_text() or '')
except Exception:
continue
text = '\n'.join(parts)
# OCR fallback if minimal text extracted (likely scanned PDF)
if len(text.strip()) < 50:
try:
from pdf2image import convert_from_bytes
from PIL import Image
import pytesseract
images = convert_from_bytes(data, first_page=1, last_page=min(10, len(reader.pages)))
ocr_parts = []
for img in images:
ocr_parts.append(pytesseract.image_to_string(img))
ocr_text = '\n'.join(ocr_parts)
if len(ocr_text.strip()) > len(text.strip()):
text = ocr_text
except Exception:
pass # Fall back to original text layer extraction
except Exception:
text = ''
elif lower.endswith(('.png','.jpg','.jpeg','.tif','.tiff')):
try:
from PIL import Image
import pytesseract
img = Image.open(stream)
text = pytesseract.image_to_string(img)
except Exception:
text = ''
else:
# Attempt generic decode
try:
text = data.decode(errors='ignore')
except Exception:
text = ''
except Exception:
text = ''
# Basic cleanup
cleaned = '\n'.join(line.rstrip() for line in (text or '').splitlines())
return cleaned.strip()
# Admin restore from data.json (profiles and songs)
@app.route('/api/admin/restore', methods=['POST'])
def admin_restore():
# Determine data.json location: prefer backend/data.json, fallback to project root
backend_path = os.path.join(os.path.dirname(__file__), 'data.json')
root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data.json'))
source_path = backend_path if os.path.exists(backend_path) else root_path if os.path.exists(root_path) else None
if not source_path:
return jsonify({ 'ok': False, 'error': 'data.json not found in backend/ or project root' }), 404
try:
with open(source_path, 'r', encoding='utf-8') as f:
payload = json.load(f)
except Exception as e:
return jsonify({ 'ok': False, 'error': f'Failed to read data.json: {str(e)}' }), 400
db = SessionLocal()
created = { 'profiles': 0, 'songs': 0 }
updated = { 'profiles': 0, 'songs': 0 }
try:
import uuid
# Restore profiles
profiles = payload.get('profiles', [])
for p in profiles:
name = (p.get('name') or f"{(p.get('first_name') or '').strip()} {(p.get('last_name') or '').strip()}" ).strip()
if not name:
continue
existing = db.query(Profile).filter(Profile.name == name).first()
if existing:
changed = False
for key in ['email','contact_number','default_key','notes']:
if p.get(key) and getattr(existing, key, None) != p.get(key):
setattr(existing, key, p.get(key))
changed = True
if changed:
updated['profiles'] += 1
else:
profile_id = str(p.get('id')) if p.get('id') else str(uuid.uuid4())
new_p = Profile(
id=profile_id,
name=name,
email=p.get('email') or '',
contact_number=p.get('contact_number') or '',
default_key=p.get('default_key') or 'C',
notes=p.get('notes') or ''
)
db.add(new_p)
created['profiles'] += 1
# Restore songs
songs = payload.get('songs', [])
for s in songs:
title = (s.get('title') or '').strip()
if not title:
continue
existing = db.query(Song).filter(Song.title == title).first()
lyrics = s.get('lyrics') or s.get('content') or ''
chords = s.get('chords') or ''
singer = s.get('singer') or s.get('artist') or ''
artist = s.get('artist') or ''
band = s.get('band') or ''
if existing:
changed = False
if lyrics and (existing.lyrics or '') != lyrics:
existing.lyrics = lyrics
changed = True
if chords and (existing.chords or '') != chords:
existing.chords = chords
changed = True
if singer and (existing.singer or '') != singer:
existing.singer = singer
changed = True
if artist and (existing.artist or '') != artist:
existing.artist = artist
changed = True
if band and (existing.band or '') != band:
existing.band = band
changed = True
if changed:
updated['songs'] += 1
else:
song_id = str(s.get('id')) if s.get('id') else str(uuid.uuid4())
new_s = Song(id=song_id, title=title, artist=artist, band=band, singer=singer, lyrics=lyrics, chords=chords)
db.add(new_s)
created['songs'] += 1
db.commit()
return jsonify({ 'ok': True, 'profiles_created': created['profiles'], 'profiles_updated': updated['profiles'], 'songs_created': created['songs'], 'songs_updated': updated['songs'], 'source': source_path })
except Exception as e:
db.rollback()
return jsonify({ 'ok': False, 'error': str(e) }), 500
finally:
db.close()
@app.route('/api/upload_lyric', methods=['POST'])
def upload_lyric():
"""Accept a lyric file and return extracted text. Does NOT create a Song automatically."""
if 'file' not in request.files:
return jsonify({'error':'file_missing'}), 400
f = request.files['file']
if not f.filename:
return jsonify({'error':'empty_filename'}), 400
# Optional metadata fields from form
title = request.form.get('title') or f.filename.rsplit('.',1)[0]
artist = request.form.get('artist') or ''
band = request.form.get('band') or ''
extracted = extract_text_from_file(f)
if not extracted:
return jsonify({'error':'extraction_failed','title':title}), 422
# Return without saving; front-end will present in modal for editing & saving
sample = extracted[:150] + ('...' if len(extracted) > 150 else '')
return jsonify({
'status':'ok',
'title': title,
'artist': artist,
'band': band,
'lyrics': extracted,
'preview': sample,
'length': len(extracted)
})
@app.route('/api/providers')
def providers():
"""Report which external provider tokens are configured (OpenAI removed)."""
return jsonify({
'local_db': True,
'chartlyrics': True,
'lifeway_link': True
})
@app.route('/')
def index():
return jsonify({'message': 'House of Prayer Song Lyrics API (Flask)', 'port': FLASK_PORT})
@app.route('/api/health')
def health():
return jsonify({'status': 'ok', 'ts': datetime.utcnow().isoformat()})
# Profile Selection Management
@app.route('/api/profile-selection/clear', methods=['POST', 'OPTIONS'])
def clear_profile_selection():
"""Clear profile selection (frontend manages this in localStorage, but endpoint needed for API compatibility)"""
if request.method == 'OPTIONS':
return '', 204
return jsonify({'status': 'ok', 'message': 'Profile selection cleared'})
# Profiles CRUD
@app.route('/api/profiles', methods=['GET','POST'])
def profiles():
db = get_db()
if request.method == 'GET':
items = db.query(Profile).all()
result = jsonify([{'id':p.id,'name':p.name,'email':p.email,'contact_number':p.contact_number,'default_key':p.default_key,'notes':p.notes} for p in items])
db.close()
return result
import uuid
data = request.get_json() or {}
profile_id = data.get('id') or str(uuid.uuid4())
p = Profile(id=profile_id, name=data.get('name'), email=data.get('email') or '', contact_number=data.get('contact_number') or '', default_key=data.get('default_key') or 'C', notes=data.get('notes') or '')
db.add(p); db.commit();
result = jsonify({'id':p.id})
db.close()
return result
@app.route('/api/profiles/<pid>', methods=['PUT','DELETE'])
def profile_item(pid):
db = get_db(); p = db.query(Profile).get(pid)
if not p: return jsonify({'error':'not_found'}),404
if request.method == 'PUT':
d = request.get_json() or {}
p.name = d.get('name', p.name)
p.email = d.get('email', p.email)
p.contact_number = d.get('contact_number', p.contact_number)
p.default_key = d.get('default_key', p.default_key)
p.notes = d.get('notes', p.notes)
db.commit(); return jsonify({'status':'ok'})
db.delete(p); db.commit(); return jsonify({'status':'deleted'})
# Songs CRUD + search (local)
@app.route('/api/songs', methods=['GET','POST'])
def songs():
db = get_db()
if request.method == 'GET':
q = request.args.get('q','').lower()
items = db.query(Song).all()
def match(s):
return (q in (s.title or '').lower() or q in (s.artist or '').lower() or q in (s.band or '').lower() or q in (s.singer or '').lower()) if q else True
# Include lyrics preview (first 200 chars) for Database component
return jsonify([{
'id':s.id,
'title':s.title,
'artist':s.artist,
'band':s.band,
'singer':s.singer,
'lyrics':(s.lyrics or '')[:200] if s.lyrics else '',
'chords':(s.chords or '')[:100] if s.chords else ''
} for s in items if match(s)])
import uuid
d = request.get_json() or {}
song_id = d.get('id') or str(uuid.uuid4())
s = Song(id=song_id, title=d.get('title') or 'Untitled', artist=d.get('artist') or '', band=d.get('band') or '', singer=d.get('singer') or '', lyrics=d.get('lyrics') or '', chords=d.get('chords') or '')
db.add(s); db.commit(); return jsonify({'id':s.id})
@app.route('/api/songs/<sid>', methods=['GET','PUT','DELETE'])
def song_item(sid):
db = get_db(); s = db.query(Song).get(sid)
if not s: return jsonify({'error':'not_found'}),404
if request.method == 'GET':
return jsonify({'id':s.id,'title':s.title,'artist':s.artist,'band':s.band,'singer':s.singer,'lyrics':s.lyrics,'chords':s.chords})
if request.method == 'PUT':
d = request.get_json() or {}
s.title = d.get('title', s.title)
s.artist = d.get('artist', s.artist)
s.band = d.get('band', s.band)
s.singer = d.get('singer', s.singer)
s.lyrics = d.get('lyrics', s.lyrics)
s.chords = d.get('chords', s.chords)
db.commit(); return jsonify({'status':'ok'})
db.delete(s); db.commit(); return jsonify({'status':'deleted'})
# Planning (date-based)
@app.route('/api/plans', methods=['GET','POST'])
def plans():
db = get_db()
if request.method == 'GET':
items = db.query(Plan).all()
return jsonify([{'id':p.id,'date':p.date.isoformat(),'profile_id':p.profile_id,'memo':p.memo} for p in items])
d = request.get_json() or {}
date_str = d.get('date'); date = datetime.fromisoformat(date_str).date() if date_str else datetime.now().date()
plan = Plan(date=date, profile_id=d.get('profile_id'), memo=d.get('memo') or '')
db.add(plan); db.commit(); return jsonify({'id':plan.id})
@app.route('/api/plans/<int:pid>/songs', methods=['GET','POST'])
def plan_songs(pid):
db = get_db(); plan = db.query(Plan).get(pid)
if not plan: return jsonify({'error':'plan_not_found'}),404
if request.method == 'GET':
links = db.query(PlanSong).filter(PlanSong.plan_id==pid).order_by(PlanSong.order_index).all()
return jsonify([{'id':l.id,'song_id':l.song_id,'order_index':l.order_index} for l in links])
d = request.get_json() or {}
link = PlanSong(plan_id=pid, song_id=d.get('song_id'), order_index=d.get('order_index') or 0)
db.add(link); db.commit(); return jsonify({'id':link.id})
# Profile Songs endpoints
@app.route('/api/profiles/<pid>/songs', methods=['GET','POST'])
def profile_songs(pid):
db = get_db()
try:
profile = db.query(Profile).get(pid)
if not profile:
return jsonify({'error':'profile_not_found'}),404
if request.method == 'GET':
links = db.query(ProfileSong).filter(ProfileSong.profile_id==pid).all()
return jsonify([{'id':l.id,'profile_id':l.profile_id,'song_id':l.song_id,'song_key':l.song_key} for l in links])
d = request.get_json() or {}
# Check if association already exists
existing = db.query(ProfileSong).filter(ProfileSong.profile_id==pid, ProfileSong.song_id==d.get('song_id')).first()
if existing:
return jsonify({'id':existing.id, 'exists':True})
link = ProfileSong(profile_id=pid, song_id=d.get('song_id'), song_key=d.get('song_key') or 'C')
db.add(link); db.commit()
return jsonify({'id':link.id})
finally:
db.close()
@app.route('/api/profiles/<pid>/songs/<sid>', methods=['GET','PUT','DELETE'])
def profile_song_item(pid, sid):
db = get_db()
try:
link = db.query(ProfileSong).filter(ProfileSong.profile_id==pid, ProfileSong.song_id==sid).first()
if not link:
return jsonify({'error':'not_found'}),404
if request.method == 'GET':
return jsonify({'id':link.id,'profile_id':link.profile_id,'song_id':link.song_id,'song_key':link.song_key})
if request.method == 'PUT':
d = request.get_json() or {}
link.song_key = d.get('song_key', link.song_key)
db.commit()
return jsonify({'status':'ok'})
db.delete(link); db.commit()
return jsonify({'status':'deleted'})
finally:
db.close()
# External search using ChartLyrics API + Local Database
@app.route('/api/search_external')
def search_external():
import urllib.request
import urllib.parse
import xml.etree.ElementTree as ET
q = request.args.get('q', '')
filter_type = request.args.get('filter', 'all')
if not q:
return jsonify({'query': q, 'results': []})
results = []
# First, search local database
db = get_db()
local_songs = db.query(Song).all()
def match_local(s):
q_lower = q.lower()
if filter_type == 'title':
return q_lower in (s.title or '').lower()
elif filter_type == 'artist':
return q_lower in (s.artist or '').lower()
elif filter_type == 'band':
return q_lower in (s.band or '').lower()
else: # all
return (q_lower in (s.title or '').lower() or
q_lower in (s.artist or '').lower() or
q_lower in (s.band or '').lower())
for s in local_songs:
if match_local(s):
snippet = (s.lyrics or '')[:150] + '...' if len(s.lyrics or '') > 150 else (s.lyrics or '')
results.append({
'title': s.title,
'artist': s.artist,
'band': s.band,
'snippet': snippet,
'source': 'local_db'
})
# Then search ChartLyrics (free API)
try:
# ChartLyrics search - expects artist and song
# We'll try different combinations based on filter
search_terms = []
if filter_type == 'artist':
search_terms.append({'artist': q, 'song': ''})
elif filter_type == 'title':
search_terms.append({'artist': '', 'song': q})
else:
# For 'all' and 'band', try both
search_terms.append({'artist': q, 'song': ''})
search_terms.append({'artist': '', 'song': q})
for term in search_terms:
try:
artist_encoded = urllib.parse.quote(term['artist'])
song_encoded = urllib.parse.quote(term['song'])
url = f'http://api.chartlyrics.com/apiv1.asmx/SearchLyric?artist={artist_encoded}&song={song_encoded}'
with urllib.request.urlopen(url, timeout=5) as response:
xml_data = response.read()
root = ET.fromstring(xml_data)
# Parse XML results (ChartLyrics returns XML)
namespace = {'cl': 'http://api.chartlyrics.com/'}
for item in root.findall('.//cl:SearchLyricResult', namespace):
title = item.find('cl:Song', namespace)
artist = item.find('cl:Artist', namespace)
lyric_id = item.find('cl:LyricId', namespace)
if title is not None and artist is not None:
# Get actual lyrics
snippet = "Preview available - click to view full lyrics"
results.append({
'title': title.text or 'Unknown',
'artist': artist.text or 'Unknown',
'band': artist.text or 'Unknown',
'snippet': snippet,
'source': 'chartlyrics',
'lyric_id': lyric_id.text if lyric_id is not None else None
})
# Limit to 5 results per search
if len([r for r in results if r.get('source') == 'chartlyrics']) >= 5:
break
except:
continue
except Exception as e:
# If external API fails, we still have local results
pass
# (Genius integration removed per user request)
# Remove duplicates and limit total results
seen = set()
unique_results = []
for r in results:
key = (r['title'].lower(), r['artist'].lower())
if key not in seen:
seen.add(key)
unique_results.append(r)
if len(unique_results) >= 10:
break
# Add Lifeway Worship search link as an additional source entry
try:
import urllib.parse as _urlparse
lifeway_url = f"https://worship.lifeway.com/findAndBuy/home?findMappable=false&searchString={_urlparse.quote(q)}"
unique_results.append({
'title': f'Search on Lifeway: {q}',
'artist': '',
'band': 'Lifeway Worship',
'snippet': 'Open Lifeway Worship results for this query',
'source': 'lifeway',
'url': lifeway_url
})
except Exception:
pass
return jsonify({
'query': q,
'filter': filter_type,
'results': unique_results,
'sources': 'Local DB + ChartLyrics (+ Lifeway link)'
})
@app.route('/api/export/<int:plan_id>')
def export_plan(plan_id):
"""Export a plan's songs as a formatted text document."""
db = get_db()
plan = db.query(Plan).get(plan_id)
if not plan:
return jsonify({'error':'plan_not_found'}), 404
profile = db.query(Profile).get(plan.profile_id) if plan.profile_id else None
plan_songs = db.query(PlanSong).filter(PlanSong.plan_id==plan_id).order_by(PlanSong.order_index).all()
lines = []
lines.append('=' * 60)
lines.append(f'WORSHIP SETLIST: {plan.date.strftime("%B %d, %Y")}')
if profile:
lines.append(f'Profile: {profile.name} (Key: {profile.default_key})')
if plan.memo:
lines.append(f'Notes: {plan.memo}')
lines.append('=' * 60)
lines.append('')
for idx, ps in enumerate(plan_songs, 1):
song = db.query(Song).get(ps.song_id)
if not song:
continue
lines.append(f'{idx}. {song.title}')
lines.append(f' Artist: {song.artist or "Unknown"} | Band: {song.band or "Unknown"}')
lines.append('')
if song.lyrics:
lines.append(' LYRICS:')
for line in song.lyrics.splitlines():
lines.append(f' {line}')
lines.append('')
if song.chords:
lines.append(' CHORDS:')
for line in song.chords.splitlines():
lines.append(f' {line}')
lines.append('')
lines.append('-' * 60)
lines.append('')
content = '\n'.join(lines)
response = app.make_response(content)
response.headers['Content-Type'] = 'text/plain; charset=utf-8'
response.headers['Content-Disposition'] = f'attachment; filename="setlist_{plan.date.isoformat()}.txt"'
return response
if __name__ == '__main__':
# Bind to all interfaces for optional external testing on alternate port
app.run(host='0.0.0.0', port=FLASK_PORT, debug=True)