Files
spotify-cantina/app/routers/admin.py
T
deivid 36a40938c7 corrige zona horaria en votación: TZ=America/Santiago en contenedor
El contenedor corría en UTC causando que la ventana horaria no
coincidiera con la hora local. Se agrega TZ en docker-compose y se
muestra la hora actual del servidor en el panel de votación.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-23 18:16:39 -04:00

377 lines
13 KiB
Python

import os
import re
import signal
import sys
import threading
import time
from datetime import datetime
from fastapi import APIRouter, Request, Depends, Form, HTTPException
from fastapi.responses import RedirectResponse, HTMLResponse
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from app.database import get_db
from app.models import Playlist, Vote, VotingConfig
from app.config import settings
from app import spotify
router = APIRouter(prefix="/admin", tags=["admin"])
templates = Jinja2Templates(directory="app/templates")
_VALID_TYPES = {"playlist", "album", "artist", "track"}
_URI_RE = re.compile(r"spotify:(playlist|album|artist|track):([A-Za-z0-9]+)")
_URL_RE = re.compile(r"open\.spotify\.com/(?:[a-z-]+/)?(playlist|album|artist|track)/([A-Za-z0-9]+)")
_BARE_ID_RE = re.compile(r"^[A-Za-z0-9]{22}$")
_TYPE_LABELS = {
"playlist": ("📋", "Playlist"),
"album": ("💿", "Álbum"),
"artist": ("🎤", "Artista"),
"track": ("🎵", "Canción"),
}
def _extract_spotify_item(value: str) -> tuple[str, str] | None:
"""Retorna (spotify_id, spotify_type) o None si la URL no es válida."""
value = value.strip().split("?")[0].rstrip("/")
if m := _URI_RE.search(value):
return m.group(2), m.group(1)
if m := _URL_RE.search(value):
return m.group(2), m.group(1)
if _BARE_ID_RE.match(value):
return value, "playlist"
return None
def _fetch_metadata(sp, spotify_id: str, spotify_type: str) -> dict:
if spotify_type == "playlist":
data = sp.playlist(spotify_id)
return {
"name": data.get("name") or spotify_id,
"description": data.get("description") or "",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "album":
data = sp.album(spotify_id)
artists = ", ".join(a["name"] for a in data.get("artists", []))
return {
"name": f"{data['name']}",
"description": f"{artists} · {str(data.get('release_date', ''))[:4]}",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "artist":
data = sp.artist(spotify_id)
return {
"name": data["name"],
"description": "Artista",
"image_url": data["images"][0]["url"] if data.get("images") else "",
}
if spotify_type == "track":
data = sp.track(spotify_id)
artists = ", ".join(a["name"] for a in data.get("artists", []))
return {
"name": data["name"],
"description": f"{artists} · {data['album']['name']}",
"image_url": data["album"]["images"][0]["url"] if data["album"].get("images") else "",
}
return {"name": spotify_id, "description": "", "image_url": ""}
def _require_admin(request: Request):
if not request.session.get("admin_logged_in"):
raise HTTPException(status_code=302, headers={"Location": "/admin/login"})
# ── Login ──────────────────────────────────────────────────────────────────────
@router.get("/status")
def admin_status(request: Request):
return {"logged_in": bool(request.session.get("admin_logged_in"))}
@router.get("/login", response_class=HTMLResponse)
def login_page(request: Request):
return templates.TemplateResponse("admin/login.html", {"request": request, "error": None})
@router.post("/login")
def login(request: Request, username: str = Form(...), password: str = Form(...)):
if username == settings.ADMIN_USERNAME and password == settings.ADMIN_PASSWORD:
request.session["admin_logged_in"] = True
return RedirectResponse(url="/admin/playlists", status_code=303)
return templates.TemplateResponse(
"admin/login.html", {"request": request, "error": "Usuario o contraseña incorrectos"}, status_code=401
)
@router.get("/logout")
def logout(request: Request):
request.session.clear()
return RedirectResponse(url="/admin/login")
# ── Playlists ──────────────────────────────────────────────────────────────────
@router.get("/playlists", response_class=HTMLResponse)
def list_playlists(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": None, "success": None},
)
@router.post("/playlists")
def add_playlist(
request: Request,
spotify_url: str = Form(...),
db: Session = Depends(get_db),
):
_require_admin(request)
parsed = _extract_spotify_item(spotify_url)
if not parsed:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": "URL no reconocida. Acepta links de Spotify de canciones, álbumes, artistas o playlists.", "success": None},
status_code=400,
)
spotify_id, spotify_type = parsed
existing = db.query(Playlist).filter(Playlist.spotify_id == spotify_id).first()
if existing:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": "Este elemento ya está en el mantenedor", "success": None},
status_code=400,
)
try:
sp = spotify.get_client()
meta = _fetch_metadata(sp, spotify_id, spotify_type)
except Exception:
meta = {"name": spotify_id, "description": "", "image_url": ""}
item = Playlist(
name=meta["name"],
spotify_id=spotify_id,
spotify_type=spotify_type,
description=meta["description"],
image_url=meta["image_url"],
)
db.add(item)
db.commit()
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
label = _TYPE_LABELS.get(spotify_type, ("", spotify_type))[1]
return templates.TemplateResponse(
"admin/playlists.html",
{"request": request, "playlists": playlists, "type_labels": _TYPE_LABELS,
"error": None, "success": f'{label} "{meta["name"]}" agregado correctamente'},
)
@router.post("/playlists/{playlist_id}/edit")
def edit_playlist(
playlist_id: int,
request: Request,
name: str = Form(...),
description: str = Form(""),
emoji: str = Form(""),
db: Session = Depends(get_db),
):
_require_admin(request)
playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Elemento no encontrado")
playlist.name = name.strip() or playlist.name
playlist.description = description.strip()
playlist.emoji = emoji.strip()
db.commit()
return RedirectResponse(url="/admin/playlists", status_code=303)
@router.post("/playlists/{playlist_id}/delete")
def delete_playlist(playlist_id: int, request: Request, db: Session = Depends(get_db)):
_require_admin(request)
playlist = db.query(Playlist).filter(Playlist.id == playlist_id).first()
if not playlist:
raise HTTPException(status_code=404, detail="Playlist no encontrada")
db.delete(playlist)
db.commit()
return RedirectResponse(url="/admin/playlists", status_code=303)
# ── Votación ────────────────────────────────────────────────────────────────────
def _get_or_create_config(db: Session) -> VotingConfig:
config = db.query(VotingConfig).first()
if not config:
config = VotingConfig(start_time="12:00", end_time="13:00", is_active=False)
db.add(config)
db.commit()
db.refresh(config)
return config
def _vote_results(db: Session) -> list[dict]:
playlists = db.query(Playlist).order_by(Playlist.created_at.desc()).all()
results = []
for pl in playlists:
count = db.query(Vote).filter(Vote.playlist_id == pl.id).count()
results.append({"playlist": pl, "votes": count})
results.sort(key=lambda x: x["votes"], reverse=True)
return results
@router.get("/voting", response_class=HTMLResponse)
def voting_admin(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
config = _get_or_create_config(db)
results = _vote_results(db)
total_votes = sum(r["votes"] for r in results)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": total_votes,
"server_time": datetime.now().strftime("%H:%M:%S"),
"error": None,
"success": None,
},
)
@router.post("/voting")
def update_voting_config(
request: Request,
start_time: str = Form(...),
end_time: str = Form(...),
is_active: str = Form(default="off"),
db: Session = Depends(get_db),
):
_require_admin(request)
config = _get_or_create_config(db)
now_str = datetime.now().strftime("%H:%M:%S")
try:
datetime.strptime(start_time, "%H:%M")
datetime.strptime(end_time, "%H:%M")
except ValueError:
results = _vote_results(db)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": "Formato de hora inválido. Use HH:MM",
"success": None,
},
status_code=400,
)
if start_time >= end_time:
results = _vote_results(db)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": "La hora de inicio debe ser anterior a la hora de fin",
"success": None,
},
status_code=400,
)
config.start_time = start_time
config.end_time = end_time
config.is_active = is_active == "on"
db.commit()
results = _vote_results(db)
return templates.TemplateResponse(
"admin/voting.html",
{
"request": request,
"config": config,
"results": results,
"total_votes": sum(r["votes"] for r in results),
"server_time": now_str,
"error": None,
"success": "Configuración guardada",
},
)
@router.post("/voting/reset")
def reset_votes(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
db.query(Vote).delete()
db.commit()
return RedirectResponse(url="/admin/voting", status_code=303)
@router.post("/voting/play-winner")
def play_winner(request: Request, db: Session = Depends(get_db)):
_require_admin(request)
results = _vote_results(db)
if not results or results[0]["votes"] == 0:
raise HTTPException(status_code=400, detail="No hay votos registrados")
winner = results[0]["playlist"]
try:
sp = spotify.get_client()
sp.start_playback(context_uri=f"spotify:playlist:{winner.spotify_id}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return RedirectResponse(url="/admin/voting", status_code=303)
# ── Sistema ────────────────────────────────────────────────────────────────────
@router.post("/system/restart")
def system_restart(request: Request):
_require_admin(request)
def _do():
time.sleep(0.8)
os.execv(sys.executable, [sys.executable] + sys.argv)
threading.Thread(target=_do, daemon=True).start()
return templates.TemplateResponse(
"admin/system_action.html",
{"request": request, "action": "reiniciando", "message": "El servidor se está reiniciando…"},
)
@router.post("/system/shutdown")
def system_shutdown(request: Request):
_require_admin(request)
def _do():
time.sleep(0.8)
os.kill(os.getpid(), signal.SIGTERM)
threading.Thread(target=_do, daemon=True).start()
return templates.TemplateResponse(
"admin/system_action.html",
{"request": request, "action": "apagando", "message": "El servidor se está apagando…"},
)