import httpx import logging import re from typing import List, Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from ..db_models import Anime, AnimeTheme, ThemeType from ..schemas import AnimeSearchResult logger = logging.getLogger(__name__) # Shared HTTP client for AnimeThemes API _animethemes_client: Optional[httpx.AsyncClient] = None def _get_animethemes_client() -> httpx.AsyncClient: """Get or create shared HTTP client for AnimeThemes.""" global _animethemes_client if _animethemes_client is None or _animethemes_client.is_closed: _animethemes_client = httpx.AsyncClient( base_url="https://api.animethemes.moe", timeout=30.0, ) return _animethemes_client class AnimeThemesService: """Service for AnimeThemes API (api.animethemes.moe).""" def __init__(self): self.client = _get_animethemes_client() async def search(self, query: str, limit: int = 20) -> List[AnimeSearchResult]: """Search anime by query using AnimeThemes API.""" try: response = await self.client.get( "/anime", params={ "q": query, "include": "images", "page[size]": limit, }, ) response.raise_for_status() data = response.json() results = [] for anime in data.get("anime", []): # Get poster URL from images poster_url = None images = anime.get("images", []) if images: # Prefer large_cover or first available for img in images: if img.get("facet") == "Large Cover": poster_url = img.get("link") break if not poster_url and images: poster_url = images[0].get("link") results.append(AnimeSearchResult( animethemes_slug=anime.get("slug"), title_english=anime.get("name"), year=anime.get("year"), poster_url=poster_url, )) return results except Exception as e: logger.error(f"Failed to search AnimeThemes: {e}") return [] async def get_or_create_anime(self, db: AsyncSession, slug: str) -> Optional[Anime]: """Get anime from DB or fetch from AnimeThemes and create.""" # Check if exists (with themes eagerly loaded) query = ( select(Anime) .where(Anime.animethemes_slug == slug) .options(selectinload(Anime.themes)) ) result = await db.execute(query) anime = result.scalar_one_or_none() if anime: return anime # Fetch from AnimeThemes try: response = await self.client.get( f"/anime/{slug}", params={"include": "images"}, ) response.raise_for_status() data = response.json() except Exception as e: logger.error(f"Failed to fetch anime from AnimeThemes: {e}") return None anime_data = data.get("anime", {}) if not anime_data: return None # Get poster URL poster_url = None images = anime_data.get("images", []) if images: for img in images: if img.get("facet") == "Large Cover": poster_url = img.get("link") break if not poster_url and images: poster_url = images[0].get("link") anime = Anime( animethemes_slug=slug, title_english=anime_data.get("name"), year=anime_data.get("year"), poster_url=poster_url, ) db.add(anime) await db.commit() await db.refresh(anime) return anime async def fetch_themes(self, db: AsyncSession, anime: Anime) -> List[AnimeTheme]: """Fetch themes from AnimeThemes API and sync to DB.""" # Always reload anime with themes to avoid lazy loading issues result = await db.execute( select(Anime) .where(Anime.id == anime.id) .options(selectinload(Anime.themes)) ) anime = result.scalar_one() current_themes = anime.themes or [] if not anime.animethemes_slug: logger.warning(f"No AnimeThemes slug for anime {anime.id}") return current_themes # Fetch themes from AnimeThemes API try: response = await self.client.get( f"/anime/{anime.animethemes_slug}", params={ "include": "animethemes.animethemeentries.videos.audio,animethemes.song.artists", }, ) if response.status_code != 200: logger.warning(f"AnimeThemes API returned {response.status_code} for {anime.animethemes_slug}") return current_themes data = response.json() except Exception as e: logger.error(f"Failed to fetch themes from AnimeThemes for {anime.animethemes_slug}: {e}") return current_themes anime_data = data.get("anime", {}) themes_data = anime_data.get("animethemes", []) logger.info(f"AnimeThemes API returned {len(themes_data)} themes for {anime.animethemes_slug}") # Build dict of existing themes existing_themes = { (t.theme_type, t.sequence): t for t in current_themes } for theme_data in themes_data: # Parse theme type and sequence: "OP1", "ED1", etc. slug = theme_data.get("slug", "") # e.g., "OP1", "ED1" match = re.match(r"(OP|ED)(\d*)", slug) if not match: continue theme_type = ThemeType.OP if match.group(1) == "OP" else ThemeType.ED sequence = int(match.group(2)) if match.group(2) else 1 # Get video URL (prioritize audio link, then video link) video_url = None entries = theme_data.get("animethemeentries", []) if entries: videos = entries[0].get("videos", []) if videos: # Try to get audio link first audio = videos[0].get("audio") if audio: video_url = audio.get("link") # Fallback to video link if not video_url: video_url = videos[0].get("link") # Get song info song_data = theme_data.get("song", {}) song_title = song_data.get("title") artist = None artists = song_data.get("artists", []) if artists: artist = artists[0].get("name") key = (theme_type, sequence) if key in existing_themes: # Update existing theme theme = existing_themes[key] theme.song_title = song_title theme.artist = artist if video_url: theme.animethemes_video_url = video_url else: # Create new theme theme = AnimeTheme( anime_id=anime.id, theme_type=theme_type, sequence=sequence, song_title=song_title, artist=artist, animethemes_video_url=video_url, ) db.add(theme) if anime.themes is None: anime.themes = [] anime.themes.append(theme) await db.commit() # Reload anime with themes to get fresh data result = await db.execute( select(Anime) .where(Anime.id == anime.id) .options(selectinload(Anime.themes)) ) refreshed_anime = result.scalar_one() return refreshed_anime.themes