import os import re import signal import sys import threading import time from datetime import datetime from fastapi import APIRouter, Request, Depends, Form, HTTPException from fastapi.responses import RedirectResponse, HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session from app.database import get_db from app.models import Playlist, Vote, VotingConfig from app.config import settings from app import spotify router = APIRouter(prefix="/admin", tags=["admin"]) templates = Jinja2Templates(directory="app/templates") _VALID_TYPES = {"playlist", "album", "artist", "track"} _URI_RE = re.compile(r"spotify:(playlist|album|artist|track):([A-Za-z0-9]+)") _URL_RE = re.compile(r"open\.spotify\.com/(playlist|album|artist|track)/([A-Za-z0-9]+)") _BARE_ID_RE = re.compile(r"^[A-Za-z0-9]{22}$") _TYPE_LABELS = { "playlist": ("πŸ“‹", "Playlist"), "album": ("πŸ’Ώ", "Álbum"), "artist": ("🎀", "Artista"), "track": ("🎡", "CanciΓ³n"), } def _extract_spotify_item(value: str) -> tuple[str, str] | None: """Retorna (spotify_id, spotify_type) o None si la URL no es vΓ‘lida.""" value = value.strip().split("?")[0].rstrip("/") if m := _URI_RE.search(value): return m.group(2), m.group(1) if m := _URL_RE.search(value): return m.group(2), m.group(1) if _BARE_ID_RE.match(value): return value, "playlist" return None def _fetch_metadata(sp, spotify_id: str, spotify_type: str) -> dict: if spotify_type == "playlist": data = sp.playlist(spotify_id, fields="name,description,images") return { "name": data["name"], "description": data.get("description") or "", "image_url": data["images"][0]["url"] if data.get("images") else "", } if spotify_type == "album": data = sp.album(spotify_id) artists = ", ".join(a["name"] for a in data.get("artists", [])) return { "name": f"{data['name']}", "description": f"{artists} Β· {str(data.get('release_date', ''))[:4]}", "image_url": data["images"][0]["url"] if data.get("images") else "", } if spotify_type == "artist": data = sp.artist(spotify_id) return { "name": data["name"], "description": "Artista", "image_url": data["images"][0]["url"] if data.get("images") else "", } if spotify_type == "track": data = sp.track(spotify_id) artists = ", ".join(a["name"] for a in data.get("artists", [])) return { "name": data["name"], "description": f"{artists} Β· {data['album']['name']}", "image_url": data["album"]["images"][0]["url"] if data["album"].get("images") else "", } return {"name": spotify_id, "description": "", "image_url": ""} def _require_admin(request: Request): if not request.session.get("admin_logged_in"): raise HTTPException(status_code=302, headers={"Location": "/admin/login"}) # ── Login ────────────────────────────────────────────────────────────────────── @router.get("/login", response_class=HTMLResponse) def login_page(request: Request): return templates.TemplateResponse("admin/login.html", {"request": request, "error": None}) @router.post("/login") def login(request: Request, username: str = Form(...), password: str = Form(...)): if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD: request.session["admin_logged_in"] = True return RedirectResponse(url="/admin/playlists", status_code=303) return templates.TemplateResponse( "admin/login.html", {"request": request, "error": "Usuario o contraseΓ±a incorrectos"}, status_code=401 ) @router.get("/logout") def logout(request: Request): request.session.clear() return RedirectResponse(url="/admin/login") # ── Playlists ────────────────────────────────────────────────────────────────── @router.get("/playlists", response_class=HTMLResponse) def list_playlists(request: Request, db: Session = Depends(get_db)): _require_admin(request) playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all() return templates.TemplateResponse( "admin/playlists.html", {"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS, "error": None, "success": None}, ) @router.post("/playlists") def add_playlist( request: Request, spotify_url: str = Form(...), db: Session = Depends(get_db), ): _require_admin(request) parsed = _extract_spotify_item(spotify_url) if not parsed: playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all() return templates.TemplateResponse( "admin/playlists.html", {"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS, "error": "URL no reconocida. Acepta links de Spotify de canciones, Γ‘lbumes, artistas o playlists.", "success": None}, status_code=400, ) spotify_id, spotify_type = parsed existing = db.query(Playlist).filter(Playlist.spotify_id == spotify_id).first() if existing: playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all() return templates.TemplateResponse( "admin/playlists.html", {"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS, "error": "Este elemento ya estΓ‘ en el mantenedor", "success": None}, status_code=400, ) try: sp = spotify.get_client() meta = _fetch_metadata(sp, spotify_id, spotify_type) except Exception: meta = {"name": spotify_id, "description": "", "image_url": ""} item = Playlist( name=meta["name"], spotify_id=spotify_id, spotify_type=spotify_type, description=meta["description"], image_url=meta["image_url"], ) db.add(item) db.commit() playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all() label = _TYPE_LABELS.get(spotify_type, ("", spotify_type))[1] return templates.TemplateResponse( "admin/playlists.html", {"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS, "error": None, "success": f'{label} "{meta["name"]}" agregado correctamente'}, ) @router.post("/playlists/{playlist_id}/edit") def edit_playlist( playlist_id: int, request: Request, name: str = Form(...), description: str = Form(""), emoji: str = Form(""), db: Session = Depends(get_db), ): _require_admin(request) playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first() if not playlist: raise HTTPException(status_code=404, detail="Elemento no encontrado") playlist.name = name.strip() or playlist.name playlist.description = description.strip() playlist.emoji = emoji.strip() db.commit() return RedirectResponse(url="/admin/playlists", status_code=303) @router.post("/playlists/{playlist_id}/delete") def delete_playlist(playlist_id: int, request: Request, db: Session = Depends(get_db)): _require_admin(request) playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first() if not playlist: raise HTTPException(status_code=404, detail="Playlist no encontrada") db.delete(playlist) db.commit() return RedirectResponse(url="/admin/playlists", status_code=303) # ── VotaciΓ³n ──────────────────────────────────────────────────────────────────── def _get_or_create_config(db: Session) -> VotingConfig: config = db.query(VotingConfig).first() if not config: config = VotingConfig(start_time="12:00", end_time="13:00", is_active=False) db.add(config) db.commit() db.refresh(config) return config def _vote_results(db: Session) -> list[dict]: playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all() results = [] for pl in playlists: count = db.query(Vote).filter(Vote.playlist_id == pl.id).count() results.append({"playlist": pl, "votes": count}) results.sort(key=lambda x: x["votes"], reverse=True) return results @router.get("/voting", response_class=HTMLResponse) def voting_admin(request: Request, db: Session = Depends(get_db)): _require_admin(request) config = _get_or_create_config(db) results = _vote_results(db) total_votes = sum(r["votes"] for r in results) return templates.TemplateResponse( "admin/voting.html", { "request": request, "config": config, "results": results, "total_votes": total_votes, "error": None, "success": None, }, ) @router.post("/voting") def update_voting_config( request: Request, start_time: str = Form(...), end_time: str = Form(...), is_active: str = Form(default="off"), db: Session = Depends(get_db), ): _require_admin(request) config = _get_or_create_config(db) try: datetime.strptime(start_time, "%H:%M") datetime.strptime(end_time, "%H:%M") except ValueError: results = _vote_results(db) return templates.TemplateResponse( "admin/voting.html", { "request": request, "config": config, "results": results, "total_votes": sum(r["votes"] for r in results), "error": "Formato de hora invΓ‘lido. Use HH:MM", "success": None, }, status_code=400, ) if start_time >= end_time: results = _vote_results(db) return templates.TemplateResponse( "admin/voting.html", { "request": request, "config": config, "results": results, "total_votes": sum(r["votes"] for r in results), "error": "La hora de inicio debe ser anterior a la hora de fin", "success": None, }, status_code=400, ) config.start_time = start_time config.end_time = end_time config.is_active = is_active == "on" db.commit() results = _vote_results(db) return templates.TemplateResponse( "admin/voting.html", { "request": request, "config": config, "results": results, "total_votes": sum(r["votes"] for r in results), "error": None, "success": "ConfiguraciΓ³n guardada", }, ) @router.post("/voting/reset") def reset_votes(request: Request, db: Session = Depends(get_db)): _require_admin(request) db.query(Vote).delete() db.commit() return RedirectResponse(url="/admin/voting", status_code=303) @router.post("/voting/play-winner") def play_winner(request: Request, db: Session = Depends(get_db)): _require_admin(request) results = _vote_results(db) if not results or results[0]["votes"] == 0: raise HTTPException(status_code=400, detail="No hay votos registrados") winner = results[0]["playlist"] try: sp = spotify.get_client() sp.start_playback(context_uri=f"spotify:playlist:{winner.spotify_id}") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) return RedirectResponse(url="/admin/voting", status_code=303) # ── Sistema ──────────────────────────────────────────────────────────────────── @router.post("/system/restart") def system_restart(request: Request): _require_admin(request) def _do(): time.sleep(0.8) os.execv(sys.executable, [sys.executable] + sys.argv) threading.Thread(target=_do, daemon=True).start() return templates.TemplateResponse( "admin/system_action.html", {"request": request, "action": "reiniciando", "message": "El servidor se estΓ‘ reiniciando…"}, ) @router.post("/system/shutdown") def system_shutdown(request: Request): _require_admin(request) def _do(): time.sleep(0.8) os.kill(os.getpid(), signal.SIGTERM) threading.Thread(target=_do, daemon=True).start() return templates.TemplateResponse( "admin/system_action.html", {"request": request, "action": "apagando", "message": "El servidor se estΓ‘ apagando…"}, )