Files
spotify-cantina/app/routers/admin.py
T
2026-04-24 18:51:38 -04:00

391 lines
13 KiB
Python

import os
import re
import signal
import sys
import threading
import time
from datetime import datetime
from fastapi import APIRouter, Request, Depends, Form, HTTPException
from fastapi.responses import RedirectResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import Playlist, Vote, VotingConfig
from app.config import settings
from app import spotify
router = APIRouter(prefix="/admin", tags=["admin"])
templates = Jinja2Templates(directory="app/templates")
_VALID_TYPES = {"playlist", "album", "artist", "track"}
_URI_RE = re.compile(r"spotify:(playlist|album|artist|track):([A-Za-z0-9]+)")
_URL_RE = re.compile(r"open\.spotify\.com/(?:[a-z-]+/)?(playlist|album|artist|track)/([A-Za-z0-9]+)")
_BARE_ID_RE = re.compile(r"^[A-Za-z0-9]{22}$")
_TYPE_LABELS = {
"playlist": ("📋", "Playlist"),
"album": ("💿", "Álbum"),
"artist": ("🎤", "Artista"),
"track": ("🎵", "Canción"),
}
def _extract_spotify_item(value: str) -> tuple[str, str] | None:
"""Retorna (spotify_id, spotify_type) o None si la URL no es válida."""
value = value.strip().split("?")[0].rstrip("/")
if m := _URI_RE.search(value):
return m.group(2), m.group(1)
if m := _URL_RE.search(value):
return m.group(2), m.group(1)
if _BARE_ID_RE.match(value):
return value, "playlist"
return None
def _fetch_metadata(sp, spotify_id: str, spotify_type: str) -> dict:
if spotify_type == "playlist":
data = sp.playlist(spotify_id)
return {
"name": data.get("name") or spotify_id,
"description": data.get("description") or "",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "album":
data = sp.album(spotify_id)
artists = ", ".join(a["name"] for a in data.get("artists", []))
return {
"name": f"{data['name']}",
"description": f"{artists} · {str(data.get('release_date', ''))[:4]}",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "artist":
data = sp.artist(spotify_id)
return {
"name": data["name"],
"description": "Artista",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "track":
data = sp.track(spotify_id)
artists = ", ".join(a["name"] for a in data.get("artists", []))
return {
"name": data["name"],
"description": f"{artists} · {data['album']['name']}",
"image_url": data["album"]["images"][0]["url"] if data["album"].get("images") else "",
}
return {"name": spotify_id, "description": "", "image_url": ""}
def _require_admin(request: Request):
if not request.session.get("admin_logged_in"):
raise HTTPException(status_code=302, headers={"Location": "/admin/login"})
# ── Login ──────────────────────────────────────────────────────────────────────
@router.get("/status")
def admin_status(request: Request):
return {"logged_in": bool(request.session.get("admin_logged_in"))}
@router.get("/login", response_class=HTMLResponse)
def login_page(request: Request):
return templates.TemplateResponse("admin/login.html", {"request": request, "error": None})
@router.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
request.session["admin_logged_in"] = True
return RedirectResponse(url="/admin/playlists", status_code=303)
return templates.TemplateResponse(
"admin/login.html", {"request": request, "error": "Usuario o contraseña incorrectos"}, status_code=401
)
@router.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse(url="/admin/login")
# ── Playlists ──────────────────────────────────────────────────────────────────
@router.get("/playlists", response_class=HTMLResponse)
def list_playlists(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": None, "success": None},
)
@router.post("/playlists")
def add_playlist(
request: Request,
spotify_url: str = Form(...),
db: Session = Depends(get_db),
):
_require_admin(request)
parsed = _extract_spotify_item(spotify_url)
if not parsed:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": "URL no reconocida. Acepta links de Spotify de canciones, álbumes, artistas o playlists.", "success": None},
status_code=400,
)
spotify_id, spotify_type = parsed
existing = db.query(Playlist).filter(Playlist.spotify_id == spotify_id).first()
if existing:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": "Este elemento ya está en el mantenedor", "success": None},
status_code=400,
)
try:
sp = spotify.get_client()
meta = _fetch_metadata(sp, spotify_id, spotify_type)
except Exception:
meta = {"name": spotify_id, "description": "", "image_url": ""}
item = Playlist(
name=meta["name"],
spotify_id=spotify_id,
spotify_type=spotify_type,
description=meta["description"],
image_url=meta["image_url"],
)
db.add(item)
db.commit()
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
label = _TYPE_LABELS.get(spotify_type, ("", spotify_type))[1]
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": None, "success": f'{label} "{meta["name"]}" agregado correctamente'},
)
@router.post("/playlists/{playlist_id}/edit")
def edit_playlist(
playlist_id: int,
request: Request,
name: str = Form(...),
description: str = Form(""),
emoji: str = Form(""),
db: Session = Depends(get_db),
):
_require_admin(request)
playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Elemento no encontrado")
playlist.name = name.strip() or playlist.name
playlist.description = description.strip()
playlist.emoji = emoji.strip()
db.commit()
return RedirectResponse(url="/admin/playlists", status_code=303)
@router.post("/playlists/{playlist_id}/delete")
def delete_playlist(playlist_id: int, request: Request, db: Session = Depends(get_db)):
_require_admin(request)
playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Playlist no encontrada")
db.delete(playlist)
db.commit()
return RedirectResponse(url="/admin/playlists", status_code=303)
# ── Votación ────────────────────────────────────────────────────────────────────
def _get_or_create_config(db: Session) -> VotingConfig:
config = db.query(VotingConfig).first()
if not config:
config = VotingConfig(start_time="12:00", end_time="13:00", is_active=False)
db.add(config)
db.commit()
db.refresh(config)
return config
def _vote_results(db: Session) -> list[dict]:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
results = []
for pl in playlists:
count = db.query(Vote).filter(Vote.playlist_id == pl.id).count()
results.append({"playlist": pl, "votes": count})
results.sort(key=lambda x: x["votes"], reverse=True)
return results
@router.get("/voting", response_class=HTMLResponse)
def voting_admin(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
config = _get_or_create_config(db)
results = _vote_results(db)
total_votes = sum(r["votes"] for r in results)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": total_votes,
"server_time": datetime.now().strftime("%H:%M:%S"),
"error": None,
"success": None,
},
)
@router.post("/voting")
def update_voting_config(
request: Request,
start_time: str = Form(...),
end_time: str = Form(...),
is_active: str = Form(default="off"),
db: Session = Depends(get_db),
):
_require_admin(request)
config = _get_or_create_config(db)
now_str = datetime.now().strftime("%H:%M:%S")
try:
datetime.strptime(start_time, "%H:%M")
datetime.strptime(end_time, "%H:%M")
except ValueError:
results = _vote_results(db)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": "Formato de hora inválido. Use HH:MM",
"success": None,
},
status_code=400,
)
if start_time >= end_time:
results = _vote_results(db)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": "La hora de inicio debe ser anterior a la hora de fin",
"success": None,
},
status_code=400,
)
was_active = config.is_active
new_active = is_active == "on"
config.start_time = start_time
config.end_time = end_time
config.is_active = new_active
db.commit()
votes_reset = False
if new_active and not was_active:
db.query(Vote).delete()
db.commit()
votes_reset = True
results = _vote_results(db)
success_msg = "Configuración guardada"
if votes_reset:
success_msg = "Configuración guardada · Votos reiniciados para la nueva ventana"
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": None,
"success": success_msg,
},
)
@router.post("/voting/reset")
def reset_votes(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
db.query(Vote).delete()
db.commit()
return RedirectResponse(url="/admin/voting", status_code=303)
@router.post("/voting/play-winner")
def play_winner(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
results = _vote_results(db)
if not results or results[0]["votes"] == 0:
raise HTTPException(status_code=400, detail="No hay votos registrados")
winner = results[0]["playlist"]
try:
sp = spotify.get_client()
sp.start_playback(context_uri=f"spotify:playlist:{winner.spotify_id}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
db.query(Vote).delete()
db.commit()
return RedirectResponse(url="/admin/voting", status_code=303)
# ── Sistema ────────────────────────────────────────────────────────────────────
@router.post("/system/restart")
def system_restart(request: Request):
_require_admin(request)
def _do():
time.sleep(0.8)
os.execv(sys.executable, [sys.executable] + sys.argv)
threading.Thread(target=_do, daemon=True).start()
return templates.TemplateResponse(
"admin/system_action.html",
{"request": request, "action": "reiniciando", "message": "El servidor se está reiniciando…"},
)
@router.post("/system/shutdown")
def system_shutdown(request: Request):
_require_admin(request)
def _do():
time.sleep(0.8)
os.kill(os.getpid(), signal.SIGTERM)
threading.Thread(target=_do, daemon=True).start()
return templates.TemplateResponse(
"admin/system_action.html",
{"request": request, "action": "apagando", "message": "El servidor se está apagando…"},
)